python的多进程使用
参考:
强烈推荐这个 【Multiprocessing系列】Multiprocessing基础,清晰易懂,尤其推荐里面的两篇资源共享系列 【Multiprocessing系列】共享资源 【Multiprocessing系列】子进程返回值,其中后者更通用,因为前者里面的方法Windows不适用
简单的模版可以参考正确使用 Multiprocessing 的姿势
一文看懂Python多进程与多线程编程(工作学习面试必读) 开篇关于线程、进程的说明很清晰
一篇文章搞定Python多进程(全) 可以参考里面的例子。特别是最后一个关于map的例子
莫凡的例子也不错 进程池 Pool
基础概念
一些api
- 创建Process相关的:
multiprocessing.Process()
- 可以直接调用默认的构造函数,创建新的进程
- 也可以继承
Process
,像Thread的使用那样 .join()
方法用来阻塞主进程(多线程时也是一样的含义),**即某子进程调用.join()
方法,则主进程会一直等到该子进程介绍,或者到达设定的时间,才会继续执行join
后面的代码。详参 python多进程的理解 multiprocessing Process join run**:
- 进程池 Pool,设定好进程数,避免了手动管理的麻烦,这个很重要:
- 顺序执行任务(串行):
- apply
- map(类似map的用法,输入一个可迭代的变量即可),但最后的输出不同于类似map(无需在外面套一个list),而是直接返回一个list
- 顺序执行任务(串行):
# coding=utf-8
from multiprocessing import Pool
import multiprocessing
import time
def task(pid):
# do something
result = str(pid)
return result
def main():
# multiprocessing.freeze_support()
cpus = multiprocessing.cpu_count()
pool = multiprocessing.Pool(cpus)
results = []
rslt = pool.map(task, range(10))
pool.close()
pool.join()
# 输出['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
print(rslt)
if __name__ == '__main__':
main()
异步执行(并行),注意返回的并不是list或者最后的结果,而是
multiprocessing.pool.AsyncResult
,需要再调用get
(还有一些别的方法):- apply_async
- map_async,同样类似上面的的map,只是返回的是AsyncResult,需要再get获得最后的结果
# coding=utf-8 from multiprocessing import Pool import multiprocessing import time from time import sleep def task(pid): # do something result = str(pid) if pid % 2 == 0: sleep(5) return result def main(): # multiprocessing.freeze_support() cpus = multiprocessing.cpu_count() pool = multiprocessing.Pool(cpus) results = [] rslt = pool.map_async(task, range(10)) pool.close() pool.join() # 注意,只有当所有进程都结束之后才会执行下面的print,因此上面task中的sleep无效 # 结果:['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] print(rslt.get()) if __name__ == '__main__': main()
通信,数据共享相关的:Value,Array(这两个都只能用c的数据类型,不推荐),Queue(最常用的),Manager
基本的思路
- 创建并开启多个进程:
- 可以类似Thread那样,通过循环创建并start
- 或者通过进程池Pool创建,就不需要通过start
- (如果是Pool,关闭进程池
Pool.close
) - join,等所有的子进程结束,处理结果
一些模版
用Process创建进程,并用shared variable收集结果(略复杂)
def worker(procnum, return_dict):
'''worker function'''
print str(procnum) + ' represent!'
return_dict[procnum] = procnum
if __name__ == '__main__':
manager = Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
# 最后的结果是多个进程返回值的集合
print return_dict.values()
- Manager支持
list
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Queue
,Value
andArray
.
来自官方的例子 Sharing state between processes
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print d
print l
创建进程池Pool,使用map_async或者apply_async,用list把所有结果收集起来
apply_async
:
def task(pid):
# do something
return result
def main():
pool = multiprocessing.Pool()
cpus = multiprocessing.cpu_count()
results = []
for i in xrange(0, cpus):
result = pool.apply_async(task, args=(i,))
results.append(result)
pool.close()
pool.join()
for result in results:
print(result.get())
map_async
:
# coding=utf-8
from multiprocessing import Pool
import multiprocessing
import time
from time import sleep
def task(pid):
# do something
result = str(pid)
if pid % 2 == 0:
sleep(2)
return result
def main():
# multiprocessing.freeze_support()
cpus = multiprocessing.cpu_count()
pool = multiprocessing.Pool(cpus)
results = []
rslt = pool.map_async(task, range(10))
pool.close()
pool.join()
print(rslt.get())
if __name__ == '__main__':
main()
Pool.map/ Pool.map_async的实践
这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if \'jpeg\' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == \'__main__\':
folder = os.path.abspath(
\'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(creat_thumbnail, images) #关键点,images是一个可迭代对象
pool.close()
pool.join()
一些tricks
Pool.map/Pool.map_async传入多个参数
通过传入list of list(or anything suitable)
partial
通过ctrl-c结束进程池
大体思路:替换系统的signal int
Catch Ctrl+C / SIGINT and exit multiprocesses gracefully in python [duplicate]
但是这个是不行的,实测不行
实测应该用Python 中 Ctrl+C 不能终止 Multiprocessing Pool 的解决方案