多任务
同一时间执行多个任务
- 并发
- 指的是任务数多余CPU核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行
- 实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已
- 并行
- 指的是任务数小于等于CPU核数,即任务真的是一起执行的
进程
一个运行的程序或者软件,
进程是操作系统资源分配的基本单位
一个程序至少有一个进程,一个进程至少有一个线程,多进程可以完成多任务, 没有进程就没有线程
使用
import multiprocessing
import time
import os
def work():
current_process = multiprocessing.current_process()
# 查看当前进程
print('work:', current_process)
# 获取当前进程的编号
# current_process.pid
# os.getpid()
print('work的进程编号:', current_process.pid, os.getpid())
# 获取父进程的编号
print('work的父进程编号:', os.getppid())
# 函数主要代码
for i in range(10):
print('工作中...')
time.sleep(.2)
# 根据进程编号杀死对应的进程
os.kill(os.getpid(), sig)
if __name__ == '__main__':
# 查看当前进程
current_process = multiprocessing.current_process()
print('main:', current_process)
# 获取当前进程的编号
print('main的进程编号:', current_process.pid)
# 创建子进程
child_process = multiprocessing.Process(target=work)
# 启动子进程
child_process.start()
# 主进程执行打印信息的操作
for i in range(10):
print('主进程工作中...')
time.sleep(.2)
给子进程制定函数传递参数
if __name__ == '__main__':
# 创建子进程
# 1. group:进程组,目前必须使用None,一般不用设置
# 2. target:执行目标函数
# 3. name: 进程名称
# 4. args: 以元组方式给函数传参
# 5. kwargs: 以字典方式给函数传参
sub_process = multiprocessing.Process(target=show_info, name="myprocess",args=("古力娜扎", 18))
进程注意点
- 进程之间不共享全局变量
- 创建子进程其实是对主进程进行拷贝,进程之间相互独立,访问的全局变量不是同一个
- 主进程会等待所有的子进程执行完成程序再退出
- 销毁子进程的代码
if __name__ == '__main__':
# 创建子进程audioread==2.1.8
work_process = multiprocessing.Process(target=work)
# 设置守护主进程,主进程退出后子进程直接销毁,不再执行子进程中的代码
# work_process.daemon = True
work_process.start()
# 让主进程等待1秒钟
time.sleep(1)
print("主进程执行完成了啦")
# 让子进程直接销毁,表示终止执行, 主进程退出之前,把所有的子进程直接销毁就可以了
work_process.terminate()
# 总结: 主进程会等待所有的子进程执行完成以后程序再退出
进程间通信(Queue
)
Queue` 本身是消息队列程序
从队列取值使用
get
方法,向队列放入值使用put
方法
- Queue.qsize():返回当前队列包含的消息数量;
- Queue.empty():如果队列为空,返回True,反之False , 注意这个操作是不可靠的。
- Queue.full():如果队列满了,返回True,反之False;
- Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
import multiprocessing
import time
# 写入数据
def write_data(queue):
for i in range(10):
if queue.full():
print("队列满了")
break
queue.put(i)
time.sleep(0.2)
print(i)
# 读取数据
def read_data(queue):
while True:
# 加入数据从队列取完了,那么跳出循环
if queue.qsize() == 0:
print("队列空了")
break
value = queue.get()
print(value)
if __name__ == '__main__':
# 创建消息队列
queue = multiprocessing.Queue(5)
# 创建写入数据的进程
write_process = multiprocessing.Process(target=write_data, args=(queue,))
# 创建读取数据的进程
read_process = multiprocessing.Process(target=read_data, args=(queue,))
# 启动进程
write_process.start()
# 主进程等待写入进程执行完成以后代码再继续往下执行
write_process.join()
read_process.start()
进程池(Pool
)
根据任务执行情况自动创建进程,而且尽量少创建进程,合理利用进程池中的进程完成多任务
terminate()
:不管任务是否完成,立即终止;
进程池在执行任务的时候会
尽量少创建进程
,合理利用现有进程完成多任务
,这样可以减少资源开销
同步执行
等待一个进程执行结束后, 在执行后面的进程
import multiprocessing
import time
# 拷贝任务
def work():
print("复制中...", multiprocessing.current_process().pid)
time.sleep(0.5)
if __name__ == '__main__':
# 创建进程池
# 3:进程池中进程的最大个数
pool = multiprocessing.Pool(3)
# 模拟大批量的任务,让进程池去执行
for i in range(5):
# 循环让进程池执行对应的work任务
# 同步执行任务,一个任务执行完成以后另外一个任务才能执行
pool.apply(work)
异步执行
按照进程池中的进程数, 多个进程同时进行
# 进程池:池子里面放的进程,进程池会根据任务执行情况自动创建进程
# 而且尽量少创建进程,合理利用进程池中的进程完成多任务
import multiprocessing
import time
# 拷贝任务
def work():
print("复制中...", multiprocessing.current_process().pid)
# 获取当前进程的守护状态
# 提示:使用进程池创建的进程是守护主进程的状态,默认自己通过Process创建的进程是不是守住主进程的状态
# print(multiprocessing.current_process().daemon)
time.sleep(0.5)
if __name__ == '__main__':
# 创建进程池
# 3:进程池中进程的最大个数
pool = multiprocessing.Pool(3)
# 模拟大批量的任务,让进程池去执行
for i in range(5):
# 循环让进程池执行对应的work任务
# 同步执行任务,一个任务执行完成以后另外一个任务才能执行
# pool.apply(work)
# 异步执行,任务执行不会等待,多个任务一起执行
pool.apply_async(work)
# 关闭进程池,意思告诉主进程以后不会有新的任务添加进来
pool.close()
# 主进程等待进程池执行完成以后程序再退出
pool.join()
追加
2019.12.13 16:50
# 进程池的同步调用
import os, time
from multiprocessing import Pool
def work(n):
print('%s run' % os.getpid())
time.sleep(3)
return n ** 2
if __name__ == '__main__':
p = Pool(3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l = []
for i in range(10):
res = p.apply(work, args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
# 但不管该任务是否存在阻塞,同步调用都会在原地等着
print(res_l)
# 进程池的异步调用
import time
from multiprocessing import Pool, cpu_count
def work(n):
return n**2
if __name__ == '__main__':
p = Pool(cpu_count()) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l = []
for i in range(10):
res = p.apply_async(work, args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
# 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
# 而是执行完一个就释放一个进程,这个进程就去接收新的任务。
res_l.append(res)
# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
# 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
p.close()
p.join()
for res in res_l:
print(res.get()) # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
评论区