Python中的多进程(multiprocessing)

Python中的多线程、包括协程,由于CPython的GIL(Global interpreter Lock ,全局解释锁)问题,只能实现并发(Concurrency),不能实现并行(Parallelism)。 因此,在并行计算场景,多进程是Python最简单的选择。

Python多进程概念

概念,就是class。 了解概念,就会了解class之间的关系。

Process

与普通的多进程类似,Python多进程的核心概念是Process(进程)。 一个简单的进程使用示例如下:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p0 = Process(target=f, args=('alice',))
    p1 = Process(target=f, args=('bob',))
    p0.start()
    p1.start()
    p0.join()
    p1.join()

进程使用三部曲:

  1. 创建Process
  2. start,开始执行。
  3. join,等待执行完毕。

Pipe

Pipe即管道,是Bash中最常见的跨进程通信手段。 echo hello | tee stdout.log,中间的|就是管道,把前一个进程的stdout传递给下一个进程。

Pipe创建时,返回两个Connection,前者负责send而后者负责recv。 两个进程各执一端,就可以实现单向通信。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    receiver, sender = Pipe()
    p = Process(target=f, args=(sender,))
    p.start()
    print(receiver.recv())   # prints "[42, None, 'hello']"
    p.join()

如果在创建Pipe时,指定duplex=True,比如Pipe(True),两个Connection即可实现双向通信。 默认duplex=False

Queue

Queue是一个基于标准模块queue、包装了Pipe的类。 它不仅具有先进先出(FIFO)的特性,还能实现跨进程通信。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

在实际使用中,除了少数简单场景外,都不会直接使用ProcessPipeQueue来实现多进程。 这种低层级(low level,无贬义)的API,可读性差,容易出错。 常用的是高层级API——进程池。

Pool

由于Process创建、销毁有较大开销,并且并行数受机器CPU数量的限制,过多无益。 一个Pool(进程池)会统一创建并维持一定数量的Process,并行地执行Task。 在所有Task执行完毕后,再统一地关闭Process

这里Task(任务)的概念,并未被实现为一个class,而是一个callable,比如下面的fg

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from multiprocessing.pool import Pool


def f(x):
    return x * x


def g(x, y):
    return x**y


def main():
    with Pool(4) as pool:
        result = pool.map(f, [1, 2, 3, 4, 5])
        print(type(result))
        print(result)

    with Pool(4) as pool:
        result = pool.starmap(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)])
        print(type(result))
        print(result)


if __name__ == '__main__':
    main()

以上代码保存为multi.py文件,执行结果如下:

$ python3 multi.py
<class 'list'>
[1, 4, 9, 16, 25]
<class 'list'>
[1, 16, 243, 4096, 78125]

map的用法类似内置函数map,专门处理单个参数的callable; 而starmap则是用来处理多个参数的callable

此外,还有利用Pool执行单个Task的apply。 除非Task本身就是一个个来的,否则使用apply的效率不高。

比起apply,更值得关注的是imapimap_unorderedimapmap非常类似,而这个多出来的i,则是Iterablemap使用的是listimap则是Iterable,前者效率略高,而后者内存消耗显著的小。 在处理结果上,imap可以尽快返回一个Iterable的结果,而map则需要等待全部Task执行完毕,返回list。 无论map还是imap,都需要按顺序等待Task执行完成,而imap_unordered则不必。 imap_unordered返回的Iterable,会优先迭代到先执行完成的Task。 三者各有特点,要按需使用。

AsyncResult

以上为进程池的同步使用方案。 同步方案会卡在mapstarmap这一行,直到所有任务都执行完毕。 有时,我们会需要一个异步方案,这时就需要用到map_asyncstarmap_async。 它们返回的结果,就是AsyncResult

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from multiprocessing.pool import Pool


def f(x):
    return x * x


def g(x, y):
    return x**y


def main():
    with Pool(4) as pool:
        result = pool.map_async(f, [1, 2, 3, 4, 5])
        print(type(result))
        print(result.get())

    with Pool(4) as pool:
        result = pool.starmap_async(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)])
        print(type(result))
        print(result.get())


if __name__ == '__main__':
    main()

以上代码保存为multi_async.py文件,执行结果如下:

$ python3 multi_async.py
<class 'multiprocessing.pool.MapResult'>
[1, 4, 9, 16, 25]
<class 'multiprocessing.pool.MapResult'>
[1, 16, 243, 4096, 78125]

以上代码中,实际等待位置是result.get()那一行。

Timeout

以上多进程代码,其实是不完善的。 除非Task非常简单,并无IO、网络等资源依赖,否则多进程也好、多线程也好,都有可能执行不完。 为了避免未知原因的挂起,及时止损,通常需要设置timeoutAsyncResult在阻塞时,可以用waitget,设置timeout参数。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import time
from multiprocessing.pool import Pool, TimeoutError


def sleep(duration):
    time.sleep(duration)
    with open('result.log', 'a') as file:
        file.write(str(duration))
        file.write('\n')
    return duration


def main():
    with Pool(4) as pool:
        result = pool.map_async(sleep, range(8))
        try:
            print(result.get(timeout=5))
        except TimeoutError:
            print(TimeoutError.__name__)


if __name__ == '__main__':
    main()

以上代码保存为timeout.py文件,执行结果如下:

$ python3 timeout.py
TimeoutError
$ cat result.log
0
1
2
3
4

可以看到,由于timeout=5,4秒以前的Task都成功了,而大于(等于)5秒的Task都失败了。

get需要等待所有进程结束时,需要在Pool关闭以前。 因此,需要在with作用域中执行,否则将超时或(没设timeout)挂死。 如果使用wait,则get可以在with以外获取结果。 因此,更推荐使用wait配合get

def main():
    with Pool(4) as pool:
        result = pool.map_async(sleep, range(8))
        result.wait()
    try:
        print(result.get(9))
    except TimeoutError:
        print(TimeoutError.__name__)

替换main(),执行结果如下:

$ python3 timeout.py
[0, 1, 2, 3, 4, 5, 6, 7]

总结

前面提到Python做并行计算的选择,多进程multiprocessing只是最简单的一个选择。 另外还有两个常见选择: 一是使用其它解释器实现的Python,比如PyPy、Jython等; 二是使用C语言优化需要并行的代码,在Native层绕过GIL的限制; 三是使用协程(或线程)加subprocess,这也算是多进程的一个方案。 此外,确认代码是否真的会被GIL所影响,是首要工作。 如果代码中真正耗时的计算是在Native层执行——这在Python中非常常见,比如OpenCV——那么用多线程也没问题。

另外,要注意多进程的测试覆盖问题。 在另一个进程执行的代码,是无法被coverage确认为已覆盖的。 需要对执行内容进行单独测试,或者在程序中预留未用多进程优化的原始方案。

其实,多进程带来的额外通信、切换开销,有时候也是很明显的。 还有个问题是,主进程被杀掉后,子进程会仍然存活,这在某些场景下会产生未知问题。 所以,在机器不是很强大的场景下,用原始的单线程串行方案,是最经济实用的选择。

参考


相关笔记