Python协程进阶,原来实现一个事件循环可以如此简单

引言

目前很多公司选择将python项目使用golang重构,很大一方面原因是因为golang的并发能力,golang自带的语法糖支持使并发编程变的相对简单,也更能充分的使用多核CPU的计算资源。

相应的,python长期受制于GIL,无法在多线程时使用多核CPU,所以一直以来在谈及python的缺陷时,性能总是无法回避的一个问题。当然,一些python著名的第三方组织也一直通过各种手段来改善python的并发性能,如twisted的异步模型使用事件驱动机制来提升python性能,著名的爬虫框架scrapy便是以twisted作为底层网络库来开发的,还有gevent,它使用greenlet在用户态完成栈和上下文切换来减少切换带来的性能损耗,同样还有著名的web协程框架tornado,他使用生成器来保存协程上下文及状态,使用原生的python语法实现了协程。但从python3.4开始python引入asyncio标准库,随后又在3.5引入async/await关键字,从根本上规范了python异步编程标准,使python异步编程逐渐流行起来。

关于什么是python协程,相信网上已经有了不少资料,但是只描述抽象的上层建筑难免会让人接受困难,本文希望可以通过从最简单的代码和逻辑,使用最基础的数据结构,从实现出发,带领大家理解什么是python协程。

首先需要补充一些基础知识

什么是生成器

我们都应该听说过迭代器,这在很多语言中都有类似的概念,简单的说,迭代器就是可以被迭代的对象,对其使用next操作可以返回一个元素,通常多次迭代后迭代器会中止,此时迭代器无法再使用。比如python中可以通过iter方法来将一个列表转换成迭代器:

In [1]: lst = [1, 2, 3] 

Python学习交流群:1004391443
In [2]: iterator = iter(lst)
In [3]: next(iterator)
Out[3]: 1
In [4]: next(iterator)
Out[4]: 2
In [5]: next(iterator)
Out[5]: 3
In [6]: next(iterator)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-7-4ce711c44abc> in <module>()
----> 1 next(iterator)
StopIteration:

生成器可以看作是迭代器的子类,同时提供了比迭代器更强大的功能,python中,可以使用yield关键字使函数返回生成器对象。

In [8]: def fun():
...: yield 1
...: yield 2
...: yield 3
...:
In [9]: iterator = fun()
In [10]: next(iterator)
Out[10]: 1
In [11]: next(iterator)
Out[11]: 2
In [12]: next(iterator)
Out[12]: 3
In [13]: next(iterator)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-13-4ce711c44abc> in <module>()
----> 1 next(iterator)
StopIteration:

每次next调用, fun函数只执行四分之一,如果我们拥有多个生成器对象, 按照一定规则 可控的对他分别调用next,生成器每次的暂停都保存了执行进度和内部状态。如果将这三个生成器理解成协程,那不正是我们熟悉的协程间的切换?

事件循环

所以,我们可以想象,现在有一个循环和一个生成器列表,每次循环,我们都将所有的生成器进行一次调用,所有生成器交替执行。如下:

In [16]: gen_list = [fun(), fun(), fun()]
In [17]: while True:
...: for gen in gen_list:
...: print(next(gen))
...:
1
1
1
2
2
2
3
3
3
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-17-f2c1d557da29> in <module>()
1 while True:
2 for gen in gen_list:
----> 3 print(next(gen))
4
StopIteration:

当然,我们还可以换一种写法,将生成器的每一步都当成是一次调用,把生成器包装成一个Handle对象,每次调用handle对象的call来完成生成器的调用,同时,我们还可以在调用完成后做一些准备来控制下一次调用的时间,将Handle对应放到一个scheduled_list里面:

def fun():
print("step1")
yield
print("step2")
yield
print("step3")
yield
scheduled_list = []
class Handle(object):
def __init__(self, gen):
self.gen = gen
def call(self):
next(self.gen)
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
if __name__ == "__main__":
loop(fun(), fun(), fun())

协程中的阻塞

在有了以上的基础后,我们来分析上面提到的切换规则,什么时候应该切换协程(生成器)?显而易见,当遇到阻塞时,我们才需要切换协程,以避免CPU的浪费。我将阻塞分为了以下三种:

  1. IO调用,如socket,file,pipe等。
  2. 人为制造的阻塞,如sleep。
  3. 异步调用。

假设,在我们的生成器内有一次socket调用,我们不知道它多久会ready,我们希望不等待它的返回,切换到其它协程运行,等其准备好之后再切换回来,该怎么办?

有同学可能会想到了,将socket注册到epoll上。如下:

import time
import socket
from functools import partial
from select import epoll
poll = epoll()
handlers = dict()
scheduled_list = []
def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done(sock.recv(1024))
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield
print("step3")
yield
def add_handler(fd, handler, events):
handlers[fd] = handler
poll.register(fd, events)
class Future(object):
def __init__(self):
self.callbacks = []
def add_callback(self, callback):
self.callbacks.append(callback)
def set_done(self, value):
self.value = value
for callback in self.callbacks:
callback()
def get_result(self):
return self.value
class Handle(object):
def __init__(self, gen):
self.gen = gen
def call(self):
yielded = next(self.gen)
if isinstance(yielded, Future):
yielded.add_callback(partial(scheduled_list.append, self))
else:
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
default_timeout = 10000
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
# 等待描述符可操作
events = poll.poll(default_timeout)
while events:
fd, event = events.popitem()
handlers[fd]()
poll.unregister(fd)
del handlers[fd]
if __name__ == "__main__":
loop(fun(), fun(), fun())

这一步引入一个新的对象Future,他用来代指未来即将发生的调用,通过epoll上注册的事件,触发了它的完成,完成之后执行了将handle对象放回scheduled_list, 可从而切回了协程。

那么,人为制造的阻塞我们怎么切换协程呢?这里,我们又引入了一个新的对象Timeout

import time
import socket
from functools import partial
from select import epoll
poll = epoll()
handlers = dict()
scheduled_list = []
# 创建一个timeout_list
timeout_list = []
def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done()
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield sleep(3)
print("step3")
yield
def add_handler(fd, handler, events):
handlers[fd] = handler
poll.register(fd, events)
def sleep(sec):
future = Future()
timeout = Timeout(sec, future.set_done)
timeout_list.append(timeout)
return future
class Timeout(object):
def __init__(self, timeout, callback):
self.deadline = time.time() + timeout
self.callback = callback
def call(self):
self.callback(None)
class Future(object):
def __init__(self):
self.callbacks = []
self.value = None
def add_callback(self, callback):
self.callbacks.append(callback)
def set_done(self, value):
self.value = value
for callback in self.callbacks:
callback()
def get_result(self):
return self.value
class Handle(object):
def __init__(self, gen):
self.gen = gen
def call(self):
yielded = next(self.gen)
if isinstance(yielded, Future):
yielded.add_callback(partial(scheduled_list.append, self))
else:
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
default_timeout = 10000
deadline = time.time()
for timeout in timeout_list[:]:
if timeout.deadline <= deadline:
timeout.call()
timeout_list.remove(timeout)
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
for timeout in timeout_list:
wait_time = timeout.deadline - deadline
if wait_time <= 0:
wait_time = 0
default_timeout = min(default_timeout, wait_time)
if not scheduled_list and not timeout_list and not handlers:
break
# 等待描述符可操作
events = poll.poll(default_timeout)
while events:
fd, event = events.popitem()
handlers[fd]()
poll.unregister(fd)
del handlers[fd]
if __name__ == "__main__":
loop(fun(), fun(), fun())

通过创建一个Timeout对象,我们在deadline时触发了其回调,使Future完成,从而完成了协程的切换。

由以上两点,我们可以大致观察出一个规律,创建Future对象,切出协程,在合适的时机(如socket ready或到达deadline/timeout)让他完成,切入协程,这才是协程切换的关键所在,由此,我们可以使用Future来管理各种异步调用。

如,我们在python编码时遇到了一个计算密集型的函数,由于python单进程无法利用多核,我们可以创建一个子进程来处理计算,同时关联到一个Future中:

def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done()
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield sleep(3)
print("step3")
future = Future()
from multiprocessing import Process
Process(target=long_time_call, args=(future, )).start()
yield future
def long_time_call(future):
#...
future.set_done()

当协程执行到第三步时,遇到了长时间运行的函数调用,我们创建了一个Future,关联到一个子进程中,并在子进程完成时设置future完成,在子进程完成之前,父进程已完成协程的切出,将执行权交给其它协程执行。

这个地方遗漏了一个细节,当没有其它协程可以执行时,epoll会被设置成超时时间=10000,因而陷入到长时间的睡眠中,而子进程完成后需要切入协程,但父进程已经被epoll阻塞掉,如何唤醒主进程继续执行该协程呢?业界通用的做法是,创建一个管道,在切出协程时让epoll监听读fd,子进程完成后,往管道中写入一个字符,epoll监听的读fd 马上变成ready,因此epoll解除阻塞,事件循环得以继续执行。

当然,异步调用不仅仅可以使用子进程,子线程、远程计算框架都可以通过这种方式执行。

讲到这里,大家应该基本明白了一个协程函数是如何工作的了。下表可帮助我们从线程的角度理解协程

上面的表格表述线程和协程的一一对应关系,最后一栏可能还需要举例解释一下:

我们知道一个线程执行过程中,会嵌套调用多个函数,如:

def foo():
print("in foo")


def bar():
print("in bar")
def fun():
bar()
foo()

if __name__ == "__main__":
fun()

那么生成器如何嵌套调用呢?python3.4之前,嵌套的生成器只能这么使用:

def foo():
print("in foo")
yield
def bar():
print("in bar")
yield
def fun():
for i in bar():
yield i
for i in foo():
yield i
if __name__ == "__main__":
for i in fun():
pass

python3.4之后引入了新的语法糖yield from,简化了调用方式:

def foo():
print("in foo")
yield
def bar():
print("in bar")
yield
def fun():
yield from bar()
yield from foo()

if __name__ == "__main__":
for i in fun():
pass

yield from可以驱动子生成器,来逐一返回子生成器中的值,将嵌套的生成器打平。值得一提的是,yield from才是await的真实身份。

让我们用最初的例子来编写一个嵌套了的子生成器函数的协程demo。我们将fun生成器抽离成2种阻塞操作,并封装好:

def read(sock):
future = Future()
def handler():
buf = sock.recv(1024)
future.set_done(buf)
add_handler(sock.fileno(), handler, 0x001)
yield future
return future.get_result()
def sleep(sec):
future = Future()
timeout = Timeout(sec, future.set_done)
timeout_list.append(timeout)
yield future

有了这两个基础函数之后,我们就可以自由的编写我们协程了

def coroutine(num):
client = socket.socket()
client.connect(("", 1234))
print(f"coroutine_{num} start")
buffer = yield from read(client)
print(f"coroutine_{num} recv: ", buffer)
yield from sleep(3)
print(f"coroutine_{num} wake up ")
client.close()
if __name__ == "__main__":
loop(coroutine(1), coroutine(2))

我们创建了两个协程,其中调用了一次socket读和一个睡眠,让我们看一下执行效果:

coroutine_1 start
coroutine_2 start
coroutine_2 recv: b'test'
coroutine_1 recv: b'test'
coroutine_2 wake up
coroutine_1 wake up

两个协程异步交替执行。

asyncio的使用

相信看完上面的例子之后,大家应该对python协程的实现有了初步的认识,那标准的python协程如何使用呢?

import socket
import asyncio
async def read(sock):
loop = asyncio.get_event_loop()
future = loop.create_future()
def handler():
buf = sock.recv(1024)
future.set_result(buf)
loop.remove_reader(sock.fileno())
loop.add_reader(sock.fileno(), handler)
await future
return future.result()
async def coroutine(num):
client = socket.socket()
client.connect(("", 1234))
print(f"coroutine_{num} start")
buffer = await read(client)
print(f"coroutine_{num} recv: ", buffer)
await asyncio.sleep(3)
print(f"coroutine_{num} wake up ")
client.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(coroutine(1), coroutine(2)))

几乎和我们的实现一模一样,其中await取代了yield from, 协程显式使用async来声明。

python协程的应用

python协程优势

通过上述例子我们可以很容易看出,python协程具有以下特点:

  1. 超级轻量,不需要维护协程栈,所有的上下文执行状态都被维护在了生成器中。
  2. 切换自由,通过yield from(python3.5以后是await)随意切换协程,协程切换完全可控以至于几乎不用加锁。
  3. 并发能力强,并发上限理论上取决于IO多路复用可注册的文件描述符的极限。

缺点

  1. 还是只能用到单核。
  2. 由于协程切换是非抢占式的,所以如果业务是CPU密集型的,可能其它协程长时间得不到执行。

综上所述,在使用python的高并发场景下,python多进程+协程是最优的解决方案。

发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();