Python中的多进程(multiprocessing)
2018-11-21 22:22:14 +08 字数:2530 标签: PythonPython中的多线程、包括协程,由于CPython的GIL(Global interpreter Lock ,全局解释锁)问题,只能实现并发(Concurrency),不能实现并行(Parallelism)。 因此,在并行计算场景,多进程是Python最简单的选择。
Python多进程概念 ¶
概念,就是class
。
了解概念,就会了解class
之间的关系。
Process ¶
与普通的多进程类似,Python多进程的核心概念是Process(进程)。 一个简单的进程使用示例如下:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p0 = Process(target=f, args=('alice',))
p1 = Process(target=f, args=('bob',))
p0.start()
p1.start()
p0.join()
p1.join()
进程使用三部曲:
- 创建Process。
start
,开始执行。join
,等待执行完毕。
Pipe ¶
Pipe即管道,是Bash中最常见的跨进程通信手段。
echo hello | tee stdout.log
,中间的|
就是管道,把前一个进程的stdout传递给下一个进程。
Pipe创建时,返回两个Connection,前者负责send而后者负责recv。 两个进程各执一端,就可以实现单向通信。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
receiver, sender = Pipe()
p = Process(target=f, args=(sender,))
p.start()
print(receiver.recv()) # prints "[42, None, 'hello']"
p.join()
如果在创建Pipe时,指定duplex=True
,比如Pipe(True)
,两个Connection即可实现双向通信。
默认duplex=False
。
Queue ¶
Queue是一个基于标准模块queue、包装了Pipe的类。 它不仅具有先进先出(FIFO)的特性,还能实现跨进程通信。
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
在实际使用中,除了少数简单场景外,都不会直接使用Process、Pipe、Queue来实现多进程。 这种低层级(low level,无贬义)的API,可读性差,容易出错。 常用的是高层级API——进程池。
Pool ¶
由于Process创建、销毁有较大开销,并且并行数受机器CPU数量的限制,过多无益。 一个Pool(进程池)会统一创建并维持一定数量的Process,并行地执行Task。 在所有Task执行完毕后,再统一地关闭Process。
这里Task(任务)的概念,并未被实现为一个class
,而是一个callable
,比如下面的f
、g
。
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from multiprocessing.pool import Pool
def f(x):
return x * x
def g(x, y):
return x**y
def main():
with Pool(4) as pool:
result = pool.map(f, [1, 2, 3, 4, 5])
print(type(result))
print(result)
with Pool(4) as pool:
result = pool.starmap(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)])
print(type(result))
print(result)
if __name__ == '__main__':
main()
以上代码保存为multi.py
文件,执行结果如下:
$ python3 multi.py
<class 'list'>
[1, 4, 9, 16, 25]
<class 'list'>
[1, 16, 243, 4096, 78125]
map的用法类似内置函数map
,专门处理单个参数的callable
;
而starmap则是用来处理多个参数的callable
。
此外,还有利用Pool执行单个Task的apply。 除非Task本身就是一个个来的,否则使用apply的效率不高。
比起apply,更值得关注的是imap和imap_unordered。
imap和map非常类似,而这个多出来的i
,则是Iterable
。
map使用的是list
而imap则是Iterable
,前者效率略高,而后者内存消耗显著的小。
在处理结果上,imap可以尽快返回一个Iterable
的结果,而map则需要等待全部Task执行完毕,返回list
。
无论map还是imap,都需要按顺序等待Task执行完成,而imap_unordered则不必。
imap_unordered返回的Iterable
,会优先迭代到先执行完成的Task。
三者各有特点,要按需使用。
AsyncResult ¶
以上为进程池的同步使用方案。 同步方案会卡在map或starmap这一行,直到所有任务都执行完毕。 有时,我们会需要一个异步方案,这时就需要用到map_async或starmap_async。 它们返回的结果,就是AsyncResult。
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from multiprocessing.pool import Pool
def f(x):
return x * x
def g(x, y):
return x**y
def main():
with Pool(4) as pool:
result = pool.map_async(f, [1, 2, 3, 4, 5])
print(type(result))
print(result.get())
with Pool(4) as pool:
result = pool.starmap_async(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)])
print(type(result))
print(result.get())
if __name__ == '__main__':
main()
以上代码保存为multi_async.py
文件,执行结果如下:
$ python3 multi_async.py
<class 'multiprocessing.pool.MapResult'>
[1, 4, 9, 16, 25]
<class 'multiprocessing.pool.MapResult'>
[1, 16, 243, 4096, 78125]
以上代码中,实际等待位置是result.get()
那一行。
Timeout ¶
以上多进程代码,其实是不完善的。
除非Task非常简单,并无IO、网络等资源依赖,否则多进程也好、多线程也好,都有可能执行不完。
为了避免未知原因的挂起,及时止损,通常需要设置timeout
。
AsyncResult在阻塞时,可以用wait或get,设置timeout
参数。
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import time
from multiprocessing.pool import Pool, TimeoutError
def sleep(duration):
time.sleep(duration)
with open('result.log', 'a') as file:
file.write(str(duration))
file.write('\n')
return duration
def main():
with Pool(4) as pool:
result = pool.map_async(sleep, range(8))
try:
print(result.get(timeout=5))
except TimeoutError:
print(TimeoutError.__name__)
if __name__ == '__main__':
main()
以上代码保存为timeout.py
文件,执行结果如下:
$ python3 timeout.py
TimeoutError
$ cat result.log
0
1
2
3
4
可以看到,由于timeout=5
,4秒以前的Task都成功了,而大于(等于)5秒的Task都失败了。
当get需要等待所有进程结束时,需要在Pool关闭以前。
因此,需要在with
作用域中执行,否则将超时或(没设timeout
)挂死。
如果使用wait,则get可以在with
以外获取结果。
因此,更推荐使用wait配合get。
def main():
with Pool(4) as pool:
result = pool.map_async(sleep, range(8))
result.wait()
try:
print(result.get(9))
except TimeoutError:
print(TimeoutError.__name__)
替换main()
,执行结果如下:
$ python3 timeout.py
[0, 1, 2, 3, 4, 5, 6, 7]
总结 ¶
前面提到Python做并行计算的选择,多进程multiprocessing只是最简单的一个选择。 另外还有两个常见选择: 一是使用其它解释器实现的Python,比如PyPy、Jython等; 二是使用C语言优化需要并行的代码,在Native层绕过GIL的限制; 三是使用协程(或线程)加subprocess,这也算是多进程的一个方案。 此外,确认代码是否真的会被GIL所影响,是首要工作。 如果代码中真正耗时的计算是在Native层执行——这在Python中非常常见,比如OpenCV——那么用多线程也没问题。
另外,要注意多进程的测试覆盖问题。 在另一个进程执行的代码,是无法被coverage确认为已覆盖的。 需要对执行内容进行单独测试,或者在程序中预留未用多进程优化的原始方案。
其实,多进程带来的额外通信、切换开销,有时候也是很明显的。 还有个问题是,主进程被杀掉后,子进程会仍然存活,这在某些场景下会产生未知问题。 所以,在机器不是很强大的场景下,用原始的单线程串行方案,是最经济实用的选择。