作为Python程序员,平时很少使用并发编程,偶尔使用也只需要派生出一批独立的线程,然后放到队列中,批量执行。所以,不夸张的说,虽然我知道线程、进程、并行、并发的概念,但每次使用的时候可能还需要再打开文档回顾一下。
现在这一篇还是 《流畅的python》读书笔记,译者在这里把future 翻译为“期物”,我觉得不太合适,既然future不能找到一个合适的词汇,暂时还是直接使用 future 吧。
future 是一种对象,表示异步执行的操作。这个概念是 concurrent.futures模块和asyncio包的基础。
concurrent.futures 模块是Python3.2 引入的,对于Python2x 版本,Python2.5 以上的版本可以安装 futures 包来使用这个模块。
从Python3.4起,标准库中有两个为Future的类:concurrent.futures.Future 和 asyncio.Future。这两个类作用相同:两个Future类的实例都表示可能已经完成或未完成的延迟计算。
Future 封装待完成的操作,可放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果(或异常)。
我们知道,如果程序中包含I/O操作,程序会有很高的延迟,CPU会处于等待状态,这时如果我们不使用并发会浪费很多时间。
我们先举个例子:
下边是有两段代码,主要功能都是从网上下载人口前20的国际的国旗:
第一段代码(flagss.py)是依序下载:下载完一个图片后保存到硬盘,然后请求下一张图片;
第二段代码(flagss_threadpool.py)使用 concurrent.futures 模块,批量下载10张图片。
运行分别运行两段代码3次,结果如下:
images.py 的结果如下
可以看到,依次下载10张图片,平均需要6秒
flags_threadpool.py 的结果如下:
使用 concurrent.futures 后,下载10张图片平均需要2秒
通过上边的结果我们发现使用 concurrent.futures 后,下载效率大幅提升。
下边我们来看下这两段代码。
同步执行的代码flags.py:
使用 concurrent.future 并发的代码 flags_threadpool.py
上边的代码,我们对 concurrent.futures 的使用有了大致的了解。但 future 在哪里呢,我们并没有看到。
Future 是 concurrent.futures 模块和 asyncio 包的重要组件。从Python3.4起,标准库中有两个为Future的类:concurrent.futures.Future 和 asyncio.Future。这两个Future作用相同。
Future 封装待完成的操作,可放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果(或异常)。
Future 表示终将发生的事情,而确定某件事情会发生的唯一方式是执行的时间已经排定。因此只有把某件事交给 concurrent.futures.Executor 子类处理时,才会创建 concurrent.futures.Future 实例。
例如,调用Executor.submit() 方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个Future。
Future 有三个重要的方法:
asyncio.Future.result 方法不支持设定超时时间,如果想获取 Future 的结果,可以使用 yield from 结构
为了加深对 Future 的理解,现在我们修改下 flags_threadpool.py download_many 函数。
现在执行代码,运行结果如下:
从结果可以看到,future 的 repr() 方法会显示状态,前三个 是running 是因为我们设定了三个进程,所以后两个是pendding 状态。如果将max_workers参数设置为5,结果就会全都是 running。
虽然,使用 future 的脚步比第一个脚本的执行速度快了很多,但由于受GIL的限制,下载并不是并行的。
CPython 解释器本身不是线程安全的,因此解释器被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。
然而,Python标准库中所有执行阻塞型I/O操作的函数,在等待系统返回结果时都会释放GIL。这意味着I/O密集型Python程序能从中受益:一个Python线程等待网络响应时,阻塞型I/O函数会释放GIL,再运行一个线程。
Python 标准库中所有阻塞型I/O函数都会释放GIL,允许其他线程运行。time.sleep()函数也会释放GIL。
那么如何在CPU密集型作业中使用 concurrent.futures 模块绕开GIL呢?
答案是 使用 ProcessPoolExecutor 类。
使用这个模块可以在做CPU密集型工作是绕开GIL,利用所有可用核心。
ThreadPoolExecutor 和 ProcessPoolExecutor 都实现了通用的 Executor 接口,所以,我们可以轻松的将基于线程的方案改为使用进程的方案。
比如下边这样:
需要注意的是,ThreadPoolExecutor 需要指定 max_workers 参数,
而 ProcessPoolExecutor 的这个参数是可选的默认值是 os.cup_count()(计算机cpu核心数)。
ProcessPoolExecutor 的价值主要体现在CPU密集型作业上。
使用Python处理CPU密集型工作,应该试试PyPy,会有更高的执行速度。
现在我们回到开始的代码,看下 Executor.map 函数。
文档中对map函数的介绍如下。
map(func, *iterables, timeout=None, chunksize=1)
等同于 map(func, *iterables),不同的是 func 是异步执行的,并且可以同时进行对 func 的多个调用。如果调用 next(),则返回的迭代器提出 concurrent.futures.TimeoutError,并且在从 Executor.map() 的原始调用起的 timeout 秒之后结果不可用。 timeout 可以是int或float。如果未指定 timeout 或 None,则等待时间没有限制。如果调用引发异常,那么当从迭代器检索其值时,将引发异常。当使用 ProcessPoolExecutor 时,此方法将 iterables 分成多个块,它作为单独的任务提交到进程池。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。对于非常长的迭代,与默认大小1相比,使用大值 chunksize 可以显着提高性能。使用 ThreadPoolExecutor,chunksize 没有效果。
在 3.5 版更改: 添加了 chunksize 参数。
Executor.map 还有个特性比较有用,那就是这个函数返回结果的顺序于调用开始的顺序是一致的。如果第一个调用称其结果用时10秒,其他调用只用1秒,代码会阻塞10秒,获取map方法返回的生成器产出的第一个结果。
如果不是获取到所有结果再处理,通常会使用 Executor.submit + Executor.as_completed 组合使用的方案。
Executor.submit + Executor.as_completed 这个组合更灵活,因为submit方法能处理不同的可调用对象和参数,而executor.map 只能处理参数不同的同一个可调用对象。此外,传给futures.as_completed 函数的期物集合可以来自不同的 Executor 实例。
futures 有三个异常类:
我们先看一下,future.result() 出现异常的处理情况。代码改动如下:
现在执行代码,会发现 download_one 中的异常传递到了download_many 中,并且导致抛出了异常,未执行完的其它future 也都中断。
为了能保证其它没有错误的future 可以正常执行,这里我们需要对future.result() 做异常处理。
改动结果如下:
这里我们用到了一个对 futures.as_completed 函数特别有用的惯用法:构建一个字典,把各个future映射到其他数据(future运行结束后可能用的)上。这样,虽然 future生成的顺序虽然已经乱了,依然便于使用结果做后续处理。
一篇写完了没有总结总感觉少点什么,所以。
Python 自 0.9.8 版就支持线程了,concurrent.futures 只不过是使用线程的最新方式。
futures.ThreadPoolExecutor 类封装了 threading 模块的组件,使使用线程变得更加方便。
顺便再推荐一下 《流畅的python》,绝对值得一下。
下一篇笔记应该是使用 asyncio 处理并发。
留言与评论(共有 0 条评论) |