导读A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 做了贡献,在 http://emptysqua.re 上写作。
协程
还记得我们对你许下的承诺么?我们可以写出这样的异步代码,它既有回调方式的高效,也有多线程代码的简洁。这个结合是同过一种称为协程coroutine的模式来实现的。使用 Python3.4 标准库 asyncio 和一个叫“aiohttp”的包,在协程中获取一个网页是非常直接的( @asyncio.coroutine 修饰符并非魔法。事实上,如果它修饰的是一个生成器函数,并且没有设置 PYTHONASYNCIODEBUG 环境变量的话,这个修饰符基本上没啥用。它只是为了框架的其它部分方便,设置了一个属性 _is_coroutine 而已。也可以直接使用 asyncio 和裸生成器,而没有 @asyncio.coroutine 修饰符):
@asyncio.coroutine
? ? def fetch(self, url):
? ? ? ? response = yield from self.session.get(url)
? ? ? ? body = yield from response.read()
它也是可扩展的。在作者 Jesse 的系统上,与每个线程 50k 内存相比,一个 Python 协程只需要 3k 内存。Python 很容易就可以启动上千个协程。
协程的概念可以追溯到计算机科学的远古时代,它很简单,一个可以暂停和恢复的子过程。线程是被操作系统控制的抢占式多任务,而协程的多任务是可合作的,它们自己选择什么时候暂停去执行下一个协程。
有很多协程的实现。甚至在 Python 中也有几种。Python 3.4 标准库 asyncio 中的协程是建立在生成器之上的,这是一个 Future 类和“yield from”语句。从 Python 3.5 开始,协程变成了语言本身的特性(“PEP 492 Coroutines with async and await syntax” 中描述了 Python 3.5 内置的协程)。然而,理解 Python 3.4 中这个通过语言原有功能实现的协程,是我们处理 Python 3.5 中原生协程的基础。
要解释 Python 3.4 中基于生成器的协程,我们需要深入生成器的方方面面,以及它们是如何在 asyncio 中用作协程的。我很高兴就此写点东西,想必你也希望继续读下去。我们解释了基于生成器的协程之后,就会在我们的异步网络爬虫中使用它们。
生成器如何工作
在你理解生成器之前,你需要知道普通的 Python 函数是怎么工作的。正常情况下,当一个函数调用一个子过程,这个被调用函数获得控制权,直到它返回或者有异常发生,才把控制权交给调用者:
>>> def foo():
...? ? bar()
...
>>> def bar():
...? ? pass
标准的 Python 解释器是用 C 语言写的。一个 Python 函数被调用所对应的 C 函数是 PyEval_EvalFrameEx。它获得一个 Python 栈帧结构并在这个栈帧的上下文中执行 Python 字节码。这里是 foo 函数的字节码:
>>> import dis
>>> dis.dis(foo)
? 2? ? ? ? ? 0 LOAD_GLOBAL? ? ? ? ? ? ? 0 (bar)
? ? ? ? ? ? ? 3 CALL_FUNCTION? ? ? ? ? ? 0 (0 positional, 0 keyword pair)
? ? ? ? ? ? ? 6 POP_TOP
? ? ? ? ? ? ? 7 LOAD_CONST? ? ? ? ? ? ? 0 (None)
? ? ? ? ? ? 10 RETURN_VALUE
foo 函数在它栈中加载 bar 函数并调用它,然后把 bar 的返回值从栈中弹出,加载 None 值到堆栈并返回。
当 PyEval_EvalFrameEx 遇到 CALL_FUNCTION 字节码时,它会创建一个新的栈帧,并用这个栈帧递归的调用 PyEval_EvalFrameEx 来执行 bar 函数。
非常重要的一点是,Python 的栈帧在堆中分配!Python 解释器是一个标准的 C 程序,所以它的栈帧是正常的栈帧。但是 Python 的栈帧是在堆中处理。这意味着 Python 栈帧在函数调用结束后依然可以存在。我们在 bar 函数中保存当前的栈帧,交互式的看看这种现象:
>>> import inspect
>>> frame = None
>>> def foo():
...? ? bar()
...
>>> def bar():
...? ? global frame
...? ? frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'
Figure 5.1 - Function Calls
现在该说 Python 生成器了,它使用同样构件——代码对象和栈帧——去完成一个不可思议的任务。
这是一个生成器函数:
>>> def gen_fn():
...? ? result = yield 1
...? ? print('result of yield: {}'.format(result))
...? ? result2 = yield 2
...? ? print('result of 2nd yield: {}'.format(result2))
...? ? return 'done'
...? ?
在 Python 把 gen_fn 编译成字节码的过程中,一旦它看到 yield 语句就知道这是一个生成器函数而不是普通的函数。它就会设置一个标志来记住这个事实:
>>> # The generator flag is bit position 5.
>>> generator_bit = 1 << 5 >>> bool(gen_fn.__code__.co_flags & generator_bit)
True
当你调用一个生成器函数,Python 看到这个标志,就不会实际运行它而是创建一个生成器:
>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>
Python 生成器封装了一个栈帧和函数体代码的引用:
>>> gen.gi_code.co_name
'gen_fn'
所有通过调用 gen_fn 的生成器指向同一段代码,但都有各自的栈帧。这些栈帧不再任何一个C函数栈中,而是在堆空间中等待被使用:
Figure 5.2 - Generators
栈帧中有一个指向“最后执行指令”的指针。初始化为 -1,意味着它没开始运行:
>>> gen.gi_frame.f_lasti
-1
当我们调用 send 时,生成器一直运行到第一个 yield 语句处停止,并且 send 返回 1,因为这是 gen 传递给 yield 表达式的值。
>>> gen.send(None)
1
现在,生成器的指令指针是 3,所编译的Python 字节码一共有 56 个字节:
>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56
这个生成器可以在任何时候、任何函数中恢复运行,因为它的栈帧并不在真正的栈中,而是堆中。在调用链中它的位置也是不固定的,它不必遵循普通函数先进后出的顺序。它像云一样自由。
我们可以传递一个值 hello 给生成器,它会成为 yield 语句的结果,并且生成器会继续运行到第二个 yield 语句处。
>>> gen.send('hello')
result of yield: hello
2
现在栈帧中包含局部变量 result:
>>> gen.gi_frame.f_locals
{'result': 'hello'}
其它从 gen_fn 创建的生成器有着它自己的栈帧和局部变量。
当我们再一次调用 send,生成器继续从第二个 yield 开始运行,以抛出一个特殊的 StopIteration 异常为结束。
>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
? File "input", line 1, in
StopIteration: done
这个异常有一个值 "done",它就是生成器的返回值。
使用生成器构建协程
所以生成器可以暂停,可以给它一个值让它恢复,并且它还有一个返回值。这些特性看起来很适合去建立一个不使用那种乱糟糟的意面似的回调异步编程模型。我们想创造一个这样的“协程”:一个在程序中可以和其他过程合作调度的过程。我们的协程将会是标准库 asyncio 中协程的一个简化版本,我们将使用生成器,futures 和 yield from 语句。
首先,我们需要一种方法去代表协程所需要等待的 future 事件。一个简化的版本是:
class Future:
? ? def __init__(self):
? ? ? ? self.result = None
? ? ? ? self._callbacks = []
? ? def add_done_callback(self, fn):
? ? ? ? self._callbacks.append(fn)
? ? def set_result(self, result):
? ? ? ? self.result = result
? ? ? ? for fn in self._callbacks:
? ? ? ? ? ? fn(self)
一个 future 初始化为“未解决的”,它通过调用 set_result 来“解决”。(这个 future 缺少很多东西,比如说,当这个 future 解决后,生成yield的协程应该马上恢复而不是暂停,但是在我们的代码中却不没有这样做。参见 asyncio 的 Future 类以了解其完整实现。)
让我们用 future 和协程来改写我们的 fetcher。我们之前用回调写的 fetch 如下:
class Fetcher:
? ? def fetch(self):
? ? ? ? self.sock = socket.socket()
? ? ? ? self.sock.setblocking(False)
? ? ? ? try:
? ? ? ? ? ? self.sock.connect(('xkcd.com', 80))
? ? ? ? except BlockingIOError:
? ? ? ? ? ? pass
? ? ? ? selector.register(self.sock.fileno(),
? ? ? ? ? ? ? ? ? ? ? ? ? EVENT_WRITE,
? ? ? ? ? ? ? ? ? ? ? ? ? self.connected)
? ? def connected(self, key, mask):
? ? ? ? print('connected!')
? ? ? ? # And so on....
fetch 方法开始连接一个套接字,然后注册 connected 回调函数,它会在套接字建立连接后调用。现在我们使用协程把这两步合并:
def fetch(self):
? ? ? ? sock = socket.socket()
? ? ? ? sock.setblocking(False)
? ? ? ? try:
? ? ? ? ? ? sock.connect(('xkcd.com', 80))
? ? ? ? except BlockingIOError:
? ? ? ? ? ? pass
? ? ? ? f = Future()
? ? ? ? def on_connected():
? ? ? ? ? ? f.set_result(None)
? ? ? ? selector.register(sock.fileno(),
? ? ? ? ? ? ? ? ? ? ? ? ? EVENT_WRITE,
? ? ? ? ? ? ? ? ? ? ? ? ? on_connected)
? ? ? ? yield f
? ? ? ? selector.unregister(sock.fileno())
? ? ? ? print('connected!')
现在,fetch 是一个生成器,因为它有一个 yield 语句。我们创建一个未决的 future,然后 yield 它,暂停 fetch 直到套接字连接建立。内联函数 on_connected 解决这个 future。
但是当 future 被解决,谁来恢复这个生成器?我们需要一个协程驱动器。让我们叫它 “task”:
class Task:
? ? def __init__(self, coro):
? ? ? ? self.coro = coro
? ? ? ? f = Future()
? ? ? ? f.set_result(None)
? ? ? ? self.step(f)
? ? def step(self, future):
? ? ? ? try:
? ? ? ? ? ? next_future = self.coro.send(future.result)
? ? ? ? except StopIteration:
? ? ? ? ? ? return
? ? ? ? next_future.add_done_callback(self.step)
# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
Task(fetcher.fetch())
loop()
task 通过传递一个 None 值给 fetch 来启动它。fetch 运行到它 yeild 出一个 future,这个 future 被作为 next_future 而捕获。当套接字连接建立,事件循环运行回调函数 on_connected,这里 future 被解决,step 被调用,fetch 恢复运行。
用 yield from 重构协程
一旦套接字连接建立,我们就可以发送 HTTP GET 请求,然后读取服务器响应。不再需要哪些分散在各处的回调函数,我们把它们放在同一个生成器函数中:
def fetch(self):
? ? ? ? # ... connection logic from above, then:
? ? ? ? sock.send(request.encode('ascii'))
? ? ? ? while True:
? ? ? ? ? ? f = Future()
? ? ? ? ? ? def on_readable():
? ? ? ? ? ? ? ? f.set_result(sock.recv(4096))
? ? ? ? ? ? selector.register(sock.fileno(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? EVENT_READ,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? on_readable)
? ? ? ? ? ? chunk = yield f
? ? ? ? ? ? selector.unregister(sock.fileno())
? ? ? ? ? ? if chunk:
? ? ? ? ? ? ? ? self.response += chunk
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? # Done reading.
? ? ? ? ? ? ? ? break
从套接字中读取所有信息的代码看起来很通用。我们能不把它从 fetch 中提取成一个子过程?现在该 Python 3 热捧的 yield from 登场了。它能让一个生成器委派另一个生成器。
让我们先回到原来那个简单的生成器例子:?
>>> def gen_fn():
...? ? result = yield 1
...? ? print('result of yield: {}'.format(result))
...? ? result2 = yield 2
...? ? print('result of 2nd yield: {}'.format(result2))
...? ? return 'done'
...
为了从其他生成器调用这个生成器,我们使用 yield from 委派它:
>>> # Generator function:
>>> def caller_fn():
...? ? gen = gen_fn()
...? ? rv = yield from gen
...? ? print('return value of yield-from: {}'
...? ? ? ? ? .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()
这个 caller 生成器的行为的和它委派的生成器 gen 表现的完全一致:
>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti? # Hasn't advanced.
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):
? File "input", line 1, in
StopIteration
当 caller 自 gen 生成(yield),caller 就不再前进。注意到 caller 的指令指针保持15不变,就是 yield from 的地方,即使内部的生成器 gen 从一个 yield 语句运行到下一个 yield,它始终不变。(事实上,这就是“yield from”在 CPython 中工作的具体方式。函数会在执行每个语句之前提升其指令指针。但是在外部生成器执行“yield from”后,它会将其指令指针减一,以保持其固定在“yield form”语句上。然后其生成其 caller。这个循环不断重复,直到内部生成器抛出 StopIteration,这里指向外部生成器最终允许它自己进行到下一条指令的地方。)从 caller 外部来看,我们无法分辨 yield 出的值是来自 caller 还是它委派的生成器。而从 gen 内部来看,我们也不能分辨传给它的值是来自 caller 还是 caller 的外面。yield from 语句是一个光滑的管道,值通过它进出 gen,一直到 gen 结束。
协程可以用 yield from 把工作委派给子协程,并接收子协程的返回值。注意到上面的 caller 打印出“return value of yield-from: done”。当 gen 完成后,它的返回值成为 caller 中 yield from 语句的值。
rv = yield from gen
前面我们批评过基于回调的异步编程模式,其中最大的不满是关于 “堆栈撕裂stack ripping”:当一个回调抛出异常,它的堆栈回溯通常是毫无用处的。它只显示出事件循环运行了它,而没有说为什么。那么协程怎么样?
>>> def gen_fn():
...? ? raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):
? File "input", line 1, in
? File "input", line 3, in caller_fn
? File "input", line 2, in gen_fn
Exception: my error
这还是非常有用的,当异常抛出时,堆栈回溯显示出 caller_fn 委派了 gen_fn。令人更欣慰的是,你可以在一次异常处理器中封装这个调用到一个子过程中,像正常函数一样:
>>> def gen_fn():
...? ? yield 1
...? ? raise Exception('uh oh')
...
>>> def caller_fn():
...? ? try:
...? ? ? ? yield from gen_fn()
...? ? except Exception as exc:
...? ? ? ? print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh
所以我们可以像提取子过程一样提取子协程。让我们从 fetcher 中提取一些有用的子协程。我们先写一个可以读一块数据的协程 read:
def read(sock):
? ? f = Future()
? ? def on_readable():
? ? ? ? f.set_result(sock.recv(4096))
? ? selector.register(sock.fileno(), EVENT_READ, on_readable)
? ? chunk = yield f? # Read one chunk.
? ? selector.unregister(sock.fileno())
? ? return chunk
在 read 的基础上,read_all 协程读取整个信息:
def read_all(sock):
? ? response = []
? ? # Read whole response.
? ? chunk = yield from read(sock)
? ? while chunk:
? ? ? ? response.append(chunk)
? ? ? ? chunk = yield from read(sock)
? ? return b''.join(response)
如果你换个角度看,抛开 yield form 语句的话,它们就像在做阻塞 I/O 的普通函数一样。但是事实上,read 和 read_all 都是协程。yield from read 暂停 read_all 直到 I/O 操作完成。当 read_all 暂停时,asyncio 的事件循环正在做其它的工作并等待其他的 I/O 操作。read 在下次循环中当事件就绪,完成 I/O 操作时,read_all 恢复运行。
最终,fetch 调用了 read_all:
class Fetcher:
? ? def fetch(self):
? ? ? ? # ... connection logic from above, then:
? ? ? ? sock.send(request.encode('ascii'))
? ? ? ? self.response = yield from read_all(sock)
神奇的是,Task 类不需要做任何改变,它像以前一样驱动外部的 fetch 协程:
Task(fetcher.fetch())
loop()
当 read yield 一个 future 时,task 从 yield from 管道中接收它,就像这个 future 直接从 fetch yield 一样。当循环解决一个 future 时,task 把它的结果送给 fetch,通过管道,read 接受到这个值,这完全就像 task 直接驱动 read 一样:
Figure 5.3 - Yield From
为了完善我们的协程实现,我们再做点打磨:当等待一个 future 时,我们的代码使用 yield;而当委派一个子协程时,使用 yield from。不管是不是协程,我们总是使用 yield form 会更精炼一些。协程并不需要在意它在等待的东西是什么类型。
在 Python 中,我们从生成器和迭代器的高度相似中获得了好处,将生成器进化成 caller,迭代器也可以同样获得好处。所以,我们可以通过特殊的实现方式来迭代我们的 Future 类:
# Method on Future class.
? ? def __iter__(self):
? ? ? ? # Tell Task to resume me here.
? ? ? ? yield self
? ? ? ? return self.result
future 的 __iter__ 方法是一个 yield 它自身的一个协程。当我们将代码替换如下时:
# f is a Future.
yield f
以及……:
# f is a Future.
yield from f
……结果是一样的!驱动 Task 从它的调用 send 中接收 future,并当 future 解决后,它发回新的结果给该协程。
在每个地方都使用 yield from 的好处是什么?为什么比用 field 等待 future 并用 yield from 委派子协程更好?之所以更好的原因是,一个方法可以自由地改变其实行而不影响到其调用者:它可以是一个当 future 解决后返回一个值的普通方法,也可以是一个包含 yield from 语句并返回一个值的协程。无论是哪种情况,调用者仅需要 yield from 该方法以等待结果就行。
亲爱的读者,我们已经完成了对 asyncio 协程探索。我们深入观察了生成器的机制,实现了简单的 future 和 task。我们指出协程是如何利用两个世界的优点:比线程高效、比回调清晰的并发 I/O。当然真正的 asyncio 比我们这个简化版本要复杂的多。真正的框架需要处理zero-copy I/0、公平调度、异常处理和其他大量特性。
使用 asyncio 编写协程代码比你现在看到的要简单的多。在前面的代码中,我们从基本原理去实现协程,所以你看到了回调,task 和 future,甚至非阻塞套接字和 select 调用。但是当用 asyncio 编写应用,这些都不会出现在你的代码中。我们承诺过,你可以像这样下载一个网页:
@asyncio.coroutine
? ? def fetch(self, url):
? ? ? ? response = yield from self.session.get(url)
? ? ? ? body = yield from response.read()
对我们的探索还满意么?回到我们原始的任务:使用 asyncio 写一个网络爬虫。