AberSheeran
Aber Sheeran

Python asyncio

起笔自
所属文集: Python-Package
共计 5878 个字符
落笔于

asyncio 模块里给了用户三个类型,FutureTask 以及 Handle。Python 语言本身提供了 Coroutine 类型。

要理解 asyncio 这个模块,就要从这四个类型的关系入手。

Coroutine

这是一个老生常谈的类型。它就是对 Generator 类型的一种封装,在较早版本的 Python 里你可以通过一个装饰器将生成器函数转换为协程函数。

通过 .send() 函数可以驱动一个 Coroutine 执行到下一个切换点,这个切换点是层层 await 语句最终等待的 Awaitable 对象 __await__ 方法的 yield 点。

用一段代码来展示如何不使用 loop 去执行并获取一个 Coroutine 对象的返回值。可以尝试修改 while True 下的 c.send 发送的变量、yieldreturn 值并观察这段代码的输出变化。

class Awaitable:
    def __await__(self):
        x = yield "yield"
        print(f"{x=}")
        return "return"


async def f():
    print("f")
    return await Awaitable()


c = f()
count = 0

try:
    r = c.send(None)
    print(f"{count}. Send {r=}")
    count += 1

    while True:
        r = c.send(None)
        print(f"{count}. Send {r=}")
        count += 1
except StopIteration as exc:
    print("Return", exc.value)
finally:
    c.close()

既然 Coroutine 只是一个特殊的生成器对象,那通过 asyncio.new_event_loop 创建的 loop 是如何知道有哪些协程对象需要被调用 .send() 的?于是在这里我们就需要 Task 来上场了。

Task

要将一个 Coroutine 对象加入 loop,就需要使用 loop.create_task 方法,它会通过一个 Coroutine 对象创建一个 Task 对象。

Task 对象的 __step_run_and_handle_result 方法里可以看到这几行代码,让人熟悉的 .sendexc.value

coro = self._coro
try:
    if exc is None:
        result = coro.send(None)
    else:
        result = coro.throw(exc)
except StopIteration as exc:
    if self._must_cancel:
        self._must_cancel = False
        super().cancel(msg=self._cancel_message)
    else:
        super().set_result(exc.value)
......

拥有了 CoroutineTask 两个对象还不够,因为程序终究是要与网卡、磁盘或者其他设备进行交互的。在过去,往往都是采用回调函数的方式来编程,这种方式会导致代码的执行顺序不断的被各种回调函数打断,非常不利于阅读。这也是 asyncio 被创造的原因。于是就需要 Future 对象来构建 Coroutine 和现实世界的桥梁。

Future

Future 实现了 __await__ 方法让它可以被 Coroutine 使用 await 进行等待;又实现了 set_result/set_exception 方法用于回调设置返回结果。

def __await__(self):
    if not self.done():
        self._asyncio_future_blocking = True
        yield self  # This tells Task to wait for completion.
    if not self.done():
        raise RuntimeError("await wasn't used with future")
    return self.result()  # May raise too.

注意到这里有这样一行代码 self._asyncio_future_blocking = True,从名字不难看出它的特殊意义。回到 Task.__step_run_and_handle_result 方法里可以看到对包含 _asyncio_future_blocking 属性的对象进行了特殊处理:如果这个属性为真,也就意味着 Future 没有被设置值,_asyncio_future_blocking 被设置为了 False,并且调用了 add_done_callback 追加回调函数,让这个 Future 在完成后调用 Task.__wakeup 来设置 Task 的结果。

def __step_run_and_handle_result(self, exc):
    ......
    result._asyncio_future_blocking = False
    result.add_done_callback(
        self.__wakeup, context=self._context)
    self._fut_waiter = result
    if self._must_cancel:
        if self._fut_waiter.cancel(
                msg=self._cancel_message):
            self._must_cancel = False
    ......

Future.add_done_task 里是这样处理回调函数的:如果 Future 已经被设置了结果,就调用 loop.call_soon,如果没有设置结果,就加入 _callbacks 列表里。而在 Futureset_resultset_exception 方法里都调用了 self.__schedule_callbacks()__schedule_callbacks 只做了一件事——让之前设置的回调函数都仅被 loop.call_soon 调用一次。

def add_done_callback(self, fn, *, context=None):
    """Add a callback to be run when the future becomes done.

    The callback is called with a single argument - the future object. If
    the future is already done when this is called, the callback is
    scheduled with call_soon.
    """
    if self._state != _PENDING:
        self._loop.call_soon(fn, self, context=context)
    else:
        if context is None:
            context = contextvars.copy_context()
        self._callbacks.append((fn, context))

def __schedule_callbacks(self):
    """Internal: Ask the event loop to call all callbacks.

    The callbacks are scheduled to be called as soon as possible. Also
    clears the callback list.
    """
    callbacks = self._callbacks[:]
    if not callbacks:
        return

    self._callbacks[:] = []
    for callback, ctx in callbacks:
        self._loop.call_soon(callback, self, context=ctx)

loop.call_soon 的功能很简单:把函数对象和参数加入 loop 的待执行队列里,并返回一个 Handle 对象。

Handle

Handle 对象是一个非常简单的对象,它类似于 functools.partial,是对函数与参数的绑定。并且提供了 cancel 函数用于指示 loop 不要执行这个 Handle

是的,到这里终于提到了 loop 执行某段代码。Handle 对象才是 loop 唯一实际接触并执行的对象。

Task 初始化函数里能看到这样一行代码,这就是 Task 被创建后立刻在 loop 里执行的原因。

def __init__(self, coro, *, loop=None, name=None, context=None,
                 eager_start=False):
    ......
    self._loop.call_soon(self.__step, context=self._context)
    ......

Loop

loop.run_forever() 里有一个 while True 不断的执行 loop._run_once()

def run_forever(self):
    """Run until stop() is called."""
    try:
        self._run_forever_setup()
        while True:
            self._run_once()
            if self._stopping:
                break
    finally:
        self._run_forever_cleanup()

通过 _run_once 的函数注释不难看出这个函数的工作就是调用各种被安排的 Handle 对象。在其中它通过 self._process_events(self._selector.select(timeout)) 来检查可以进行读写的描述符安排对应的 Handle 到可执行的队列中。

def _run_once(self):
    """Run one full iteration of the event loop.

    This calls all currently ready callbacks, polls for I/O,
    schedules the resulting callbacks, and finally schedules
    'call_later' callbacks.
    """
    ......

loop.run_until_completeasyncio.run 是对 loop.run_forever 的封装,通过把 loop.stop() 添加到回调函数里来达到执行完协程后跳出 run_forever 死循环的目的。

综述

Future 是对回调的封装,让回调的结果可以被 Coroutine 使用 await 等待。Task 是对 Coroutine 的封装,让一个 Coroutine 的执行过程可以被拆解为一个个 Handle 对象加入 loop 中执行。

在代码实现上 Task 继承了 Future,这对理解整个 asyncio 的结构造成了一些误会(在几年前我刚学习 asyncio 时看过很多文章吐槽 Task 继承 Future)。但其实这一继承只是为了让 Task 复用 Futureawait 方法,并不是说明 FutureTask 这两个概念本身是继承关系。

同步阻塞

很多新手在使用 asyncio 时会有一种误区,认为把原来的函数改成 async def 就获得了神奇的性能提升。但看完本文之后你应该知道,并不会。

asyncio 所有的代码依旧在同一个线程里执行,如果你的 async def 函数里没有 await,整个函数甚至会一次性全部执行完。而在你的 async def 里只要有任何一点阻塞的部分,整个 loop 就会在这儿停顿。不仅不会提升性能,反而是一个负优化。

如果你真的需要使用 asyncio 而又难以改造代码里的阻塞函数时,可以使用 asyncio.to_thread 把阻塞的同步函数丢进线程池中执行。

如果你觉得本文值得,不妨赏杯茶
asyncio 与 kafka
Python signal