这是用户在 2025-8-5 23:16 为 https://sailor.li/asyncio?utm_source=www.pythonweekly.com&utm_medium=newsletter&utm_campaign=python-... 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?

asyncio: a library with too many sharp corners
asyncio:一个有太多尖锐问题的库

Red garden tulip (Tulipa gesneriana)
A red garden tulip. Photo by my friend Jamie.
一株红色的郁金香。照片 我的朋友杰米 (Jamie) 写的。

One of the headliner features of Python 3.4 (released in 2014) was a new library in the standard library: asyncio, provisionally introduced for feedback as an import of the external tulip library. In Python 3.5 (released in 2015), async and await were added as keywords to the language specifically for usage with asynchronous libraries, replacing the usage of yield from. The asyncio module was also made non-provisional in this release, heralding an entire new ecosystem of asynchronous libraries in the Python world.
Python 3.4(2014 年发布)的头条特性之一是标准库中新增了一个库: asyncio ,它暂时作为外部 tulip 库的导入引入,以征求反馈。在 Python 3.5(2015 年发布)中, asyncawait 被添加为关键字,专门用于异步库,取代了 yield from 。 在此版本中, asyncio 模块也变为非临时模块,这预示着 Python 世界中异步库的全新生态系统的到来。

But asyncio has so many sharp corners and design issues it is far too difficult to use, bordering on being fundamentally broken. Some of these were only realised in hindsight because other languages (Kotlin, Swift) and libraries did what asyncio does significantly better; but most of these issues were bad at release and it is baffling how the library made it out of provisional status with such glaring flaws.
但是 asyncio 存在太多缺陷和设计问题,使用起来非常困难,几乎可以说是彻底崩溃了。其中一些问题直到后来才意识到,因为其他语言(Kotlin、Swift)和库在 asyncio 功能上做得更好;但大多数问题在发布时就已经很严重了,令人费解的是,这个库是如何在存在如此明显缺陷的情况下摆脱临时状态的。

I mention the Trio library a lot in this post, but there's also the AnyIO library that implements Trio-like semantics on top of asyncio, fixing most of the issues described here whilst retaining a level of compatibility with regular asyncio libraries.
我在这里多次提到 Trio 库 帖子,但还有 AnyIO 库在 asyncio 之上实现了类似 Trio 的语义,修复了这里描述的大多数问题,同时保留了与常规 asyncio 一定程度的兼容性 图书馆。

Contents  内容

  1. Major Problem #1: Cancellation is broken
    主要问题 #1:取消功能失效
  2. Major Problem #2: Task was destroyed but it is pending!
    主要问题#2:任务已被破坏但仍然处于待处理状态!
  3. Major Problem #3: I/O has pointless landmines
    主要问题 #3:I/O 存在无意义的陷阱
  4. Major Problem #4: asyncio.Queue is difficult to use
    主要问题 #4: asyncio.Queue 难以使用
  5. Other, less major problems
    其他不太严重的问题

Major Problem #1: Cancellation is broken
主要问题 #1:取消功能失效

In the traditional model of concurrent programming using threads, there is no clean way to do cancellation. In the standard pthreads model, the only way to do cancellation is to brutally murder a thread using pthread_kill, which is nearly always a bad idea because anything the thread was using (such as locks) will be in an unknown and inconsistent state if the thread was killed in the middle of an operation. If you want a better mechanism, you need to implement it yourself by constantly polling a shared state object in-between doing work, using a loop like so:
在使用线程的传统并发编程模型中,没有简洁的取消方法。在标准的 pthreads 模型中,唯一的取消方法是使用 pthread_kill 粗暴地杀死一个线程,但这几乎总是一个坏主意,因为如果线程在操作过程中被杀死,该线程正在使用的任何内容(例如锁)都将处于未知且不一致的状态。如果您想要一个更好的机制,您需要自己实现它,方法是在执行工作之间不断轮询共享状态对象,使用如下循环:

cancelled = threading.Event()

def t1():
    while not cancelled.is_set():
        do_work()

def main():
    threading.Thread(target=t1).start()
    # ... do something inbetween
    cancelled.set()

This is unergonomic and error-prone as only threads that opt-in to this cancellation mechanism can be cancelled and only when they explicitly check if they are cancelled. Some languages (i.e. Java) make this a bit better by having a Thread.interrupt() method that handles dealing with communicating the interrupted state, with most standard library functions such as Object.wait() automatically checking for the interrupted state. (This still falls victim to the other issues described here.)
这很不人性化,而且容易出错,因为只有启用此取消机制的线程才能被取消,而且只有在它们明确检查是否被取消的情况下才能被取消。有些语言(例如 Java)通过 Thread.interrupt() 方法处理以下情况,从而稍微改善了这种情况: 处理中断状态的通信,大多数标准库函数如下 Object.wait() 自动检查中断状态。(这仍然会受到本文所述其他问题的影响。)

asyncio is an asynchronous runtime and is responsible for its own scheduling of its own tasks, instead of the kernel. When an asyncio task needs to interact with the system, it asks the event loop to suspend it until an operation is complete whereupon the task will be rescheduled and will run again in the next tick of the event loop. Threads do the same, but with system calls, and the user application has no control over the kernel's scheduler beyond tweaking some tuning parameters.
asyncio 是一个异步运行时,它负责自身任务的调度,而不是内核的调度。当 asyncio 任务需要与系统交互时,它会请求事件循环将其暂停,直到某个操作完成,然后该任务将被重新调度,并在事件循环的下一个 tick 中再次运行。线程也会执行相同的操作,但需要使用系统调用,并且用户应用程序除了调整一些调优参数外,无法控制内核的调度程序。

This scheduling mechanism is reused to implement cancellation. When a task is cancelled, any pending operation that the event loop was performing for a task is cancelled, and instead the call raises a CancelledError. Unlike threads tasks no longer need to check if they have been cancelled; every single time a call drops into the event loop the runtime itself checks for cancellation. Conceptually, you can imagine every await as a cancellation point:
此调度机制可重用于实现取消。当任务被取消时,事件循环正在为该任务执行的任何待处理操作都将被取消,并且调用会引发 CancelledError 。与线程不同,任务不再需要检查它们是否已被取消;每次调用进入事件循环时,运行时都会自行检查是否被取消。从概念上讲,你可以将每个 await 想象为 取消点

async def something(stream: SomeLibraryStream):
    while True:
        result = await stream.read()  # Cancellation point
        parsed = do_parse(result)     # *not* a cancellation point

From this, we can derive a conceptual model of how tasks and cancellations interact:
由此,我们可以得出任务和取消如何相互作用的概念模型:

  1. Tasks run until an await, at which point they suspend.
    任务一直运行直到 await ,此时它们会暂停。
  2. Something else calls task.cancel(), which reschedules the task again.
    其他方法调用 task.cancel() ,重新安排任务。
  3. The function that was being await-ed now raises Cancelled.
    正在 await 函数现在引发了 Cancelled
  4. This exception propagates backwards, unwinding through all functions in the call stack and cleaning up as it goes.
    此异常向后传播,展开调用堆栈中的所有函数并在进行过程中进行清理。

This avoids both problems with threads: tasks can be externally killed without worrying about resources not being torn down, and end-user tasks don't need to constantly check if they've been cancelled because the event loop does it for you.
这避免了线程的两个问题:可以从外部终止任务而不必担心资源不会被拆除,并且最终用户任务不需要不断检查它们是否已被取消,因为事件循环会为您完成此操作。

But that's not how it works
但事实并非如此

Consider this function below that returns a resource wrapped in an asynchronous context manager. When the user is done, it needs to clean up some resources (say, a server needs a clean close). This cleanup should be done regardless of if the code running inside the context manager was successful or not, so it's ran inside a finally block:
考虑下面这个函数,它返回一个封装在异步上下文管理器中的资源。当用户完成操作后,它需要清理一些资源(比如,服务器需要彻底关闭)。无论上下文管理器中的代码运行是否成功,都应该进行此清理,因此它在 finally 块中运行:

@asynccontextmanager
async def connect_to_server(ip: str, *, port: int = 6767) -> AsyncIterator[Sock]:
    sock = await connect_socket(ip, port)

    async with sock:
        await sock.send(b"IDENTIFY ident :bar\r\nNICKNAME :gquuuuuux)\r\n")
        try:
            yield sock
        finally:
            await sock.send(b"QUIT :died to some small fry")

In this case, let's say .send() waits for some form of acknowledgement message. There's also another task that is spawned somewhere, and it's sending a PING message to the server every few seconds and expecting a PONG message. If the client goes too long without receiving a PONG, it cancels the task inside the context manager and exits itself.
在这种情况下,假设 .send() 等待某种形式的确认消息。 还有另一个任务在某处产生,它正在发送一个 每隔几秒向服务器发送 PING 消息,并等待 PONG 消息。如果客户端长时间没有收到 PONG 消息,它会取消上下文管理器内的任务并退出。

What happens if the server does stop responding, and the task is cancelled? Let's see:
如果服务器停止响应,任务被取消,会发生什么?让我们看看:

  1. First, any code in the user function running inside the asynchronous context manager is cancelled with a CancelledException bubbling upwards.
    首先,异步上下文管理器内运行的用户函数中的任何代码都会被取消,并产生向上冒泡的 CancelledException

  2. Next, the yield sock expression raises a CancelledException, and control flows into the finally block.
    接下来, yield sock 表达式引发 CancelledException ,并且控制流进入 finally 块。

  3. The code enters the sock.send() function, which re-enters the event loop. The event loop completely forgets that the task was cancelled and is entirely happy to deadlock the application forever waiting for the server to respond to the .send() (which will never happen).
    代码进入 sock.send() 函数,重新进入事件循环。 事件循环完全忘记了任务已被取消,并且非常乐意 导致应用程序永远死锁,等待服务器响应 .send() (永远不会发生)。

This is because cancellations in asyncio are edge-triggered, not level-triggered. These concepts are mostly used in the world of electronics, but are also applicable to certain types of programming too; an edge-triggered event only fires once when the state changes. In this case, it's Task.cancel() firing a cancellation error exactly once. This is the opposite behaviour to level-triggered cancellations, where cancelling a task will cause all calls to the event loop to raise a CancelledException, forever.
这是因为 asyncio 中的取消是边缘触发的 ,而不是 电平触发 。这些概念主要用于电子领域,但也适用于某些类型的编程; 边沿触发事件只会触发 状态改变时执行一次 。在本例中, Task.cancel() 只会触发一次取消错误。这与级别触发的取消行为相反,在级别触发的取消中,取消任务会导致 所有对事件循环的调用都会引发 CancelledException ,永远。

Here's a more practical example that you can run directly on your computer to see this behaviour.
这是一个更实际的示例,您可以直接在计算机上运行它来查看此行为。

import asyncio

event = asyncio.Event()

async def fn():
    try:
        event.set()
        await asyncio.sleep(60)
    finally:
        await asyncio.sleep(5)
        print("slept for 5s")

async def main():
    task = asyncio.create_task(fn())
    await event.wait()
    task.cancel()
    await asyncio.sleep(10)

asyncio.run(main())

When you run this, the first sleep(60) will be cancelled, and then the program will sleep for five more seconds before printing a slept for 5s message because the cancellation disappeared.
当您运行此命令时,第一个 sleep(60) 将被取消,然后程序将再休眠五秒钟,然后打印 slept for 5s 消息,因为取消操作消失了。

This is absolutely 100% the wrong behaviour and it makes cancellations dangerous when it can be swallowed or covered up at any point.
这绝对是 100% 错误的行为,而且当这种取消行为可能在任何时候被接受或掩盖时,它就会变得很危险。

  • Using a bare except:? Swallows cancellations. People will lie and say that they don't write these, but people do use bare excepts. Even if you don't, do you know that every other library doesn't?
    使用裸 except: 会吞下取消操作。人们会撒谎说他们没有写过这些,但人们确实会使用裸 excepts 。即使 不知道,你知道其他图书馆也不知道吗?

  • Doing cleanup in __aexit__? Can deadlock waiting for something that will never happen, swallowing the cancellation.
    __aexit__ 中进行清理?等待某些操作会导致死锁吗? 永远不会发生,吞下取消。

  • Doing cleanup in try/finally? See above.
    try/finally 中进行清理?参见上文。

It could be better
可能会更好

Graceful asynchronous cleanup is intrinsically a difficult problem; if an operation blocks for too long, what do you do? If you adopt a rigid rule of always trying to be graceful you risk running into deadlocks if the operation never returns. If you simply avoid doing anything gracefully and just sever connections and open files with a machete you can end up with half-written data or some very unhappy servers on the other end. It doesn't really matter in the asyncio world, because the library doesn't give you any tools to implement this.
优雅的异步清理本质上是一个难题;如果一个操作阻塞太久,你该怎么办?如果你制定了一条严格的规则, 总是试图做到优雅,那么 如果操作永远不返回,就会面临死锁的风险。如果你只是避免做任何事情 优雅地切断连接,用砍刀打开文件,你最终会得到 数据写入一半,或者另一端的服务器非常不顺畅。这在 asyncio 世界,因为该库没有提供任何工具来实现这一点。

The Trio library takes the opposite approach; all cancellations are level-triggered. Let's port the sleeping example above to use Trio instead:
Trio 库采用了相反的方法;所有取消操作都是级别触发的 。让我们将上面的睡眠示例移植到 Trio 中:

import trio

event = trio.Event()

async def task():
    try:
        event.set()
        await trio.sleep(60)
    finally:
        await trio.sleep(5)
        print("slept for 5s")

async def main():
    async with trio.open_nursery() as n:
        n.start_soon(task)
        await event.wait()
        n.cancel_scope.cancel()
        await trio.sleep(10)  # Not needed, but for parity with the previous example.

trio.run(main)

Running this will produce... no output. It won't wait either, because anything that could wait has been cancelled. If you add a print() between the event.wait and the cancel_scope.cancel(), that will print something too, so it's not exiting early because it's not running anything.
运行此代码不会产生任何输出。它也不会等待,因为所有可以等待的操作都已被取消。如果在 event.waitcancel_scope.cancel() 之间添加 print() ,它也会打印一些内容,所以它不会因为没有运行任何操作而提前退出。

This then asks a question: How do you do graceful cleanup? With shielded cancel scopes and timeouts. I'll replace the finally block above with one of those:
这就引出了一个问题:如何优雅地进行清理?使用屏蔽取消范围和超时。我将上面的 finally 块替换为以下代码之一:

    finally:
        with trio.move_on_after(1, shield=True):
            await trio.sleep(5)
        print("slept for 1s?")
        await trio.sleep(5)
        print("slept for 5s?")

Running this will print slept for 1s?, but nothing more. The code running inside the context manager ignored the outside cancellation, but was re-cancelled after a second anyway. This once again nets you the best of both worlds: cancellations aren't swallowed unless you explicitly opt-in. Remember the Zen of Python: Explicit is better than implicit.
运行此代码会打印 slept for 1s? ,但仅此而已。上下文管理器内部运行的代码忽略了外部取消,但一秒钟后又被取消了。这再次为您带来了两全其美的效果:除非您明确选择加入,否则取消不会被吞噬。记住 Python 之禅:显式优于隐式。

Major Problem #2: Task was destroyed but it is pending!
主要问题#2:任务已被破坏但仍然处于待处理状态!

If you've ever used an asyncio application, you've probably seen that message pop up before. As an example, if I Ctrl-C portage too quickly, it spits out a few of those errors. Why? Because asyncio does not keep strong references to tasks. Quoting the official documentation:
如果你曾经使用过 asyncio 应用程序,你可能之前见过这样的弹出消息。 例如,如果我按 Ctrl-C portage 运行速度太快,会报出一些类似的错误。为什么?因为 asyncio 没有保留对任务的强引用。官方文档是这样说的:

Important  重要的

Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done. For reliable “fire-and-forget” background tasks, gather them in a collection:
保存对此函数结果的引用,以避免任务在执行过程中消失。事件循环仅保留对任务的弱引用。未在其他地方引用的任务可能随时被垃圾回收,甚至在完成之前。为了实现可靠的“即发即弃”后台任务,请将它们收集到一个集合中:

Let's take some example code:
让我们来看一些示例代码:

import asyncio, gc


async def expose_bugs():
    while True:
        await asyncio.sleep(0.5)
        # Simulate doing work that would have the GC fire.
        gc.collect()


async def has_bug():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()
    await fut


async def main():
    t = asyncio.create_task(expose_bugs())
    asyncio.create_task(has_bug())
    await asyncio.sleep(5)


asyncio.run(main())

If you run this, it will print a warning to stderr about how has_bug was destroyed when it was pending. has_bug has no strong references to it, so when the GC runs the weak reference the event loop holds is removed and the task is dropped on the floor. Goodbye, has_bug.
如果你运行这个程序,它会向 stderr 打印一条警告,说明 has_bug 在挂起时是如何被销毁的。has_bug 没有对它 has_bug 强引用,所以当 GC 运行时,事件循环持有的弱引用会被移除,任务也会被丢弃。再见了, has_bug

This is very obviously insane behaviour, but it can somewhat be avoided by always holding references to spawned tasks (similarly to how you can avoid segmentation faults by always doing bounds checking). But it gets worse. There's a set of helper functions that are used for corralling tasks around: wait_for, gather, or shield; these can all cause a function being waited on to be dropped on the floor because they internally spawn said function as a task and wait on that instead:
这显然是疯狂的行为,但可以通过以下方式避免 始终持有对衍生任务的引用(类似于通过始终进行边界检查来避免分段错误)。但情况会更糟。有一组辅助函数可用于整理任务: wait_forgathershield ;这些都可能导致正在等待的函数被丢弃,因为它们在内部生成所述函数作为任务并等待

import asyncio, gc


async def expose_bugs():
    while True:
        await asyncio.sleep(0.5)
        # Simulate doing work that would have the GC fire.
        gc.collect()


async def has_bug():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()
    await fut


async def shield_task():
    await asyncio.shield(has_bug())


async def main():
    t1 = asyncio.create_task(expose_bugs())
    t2 = asyncio.create_task(shield_task())
    # scheduling pass
    await asyncio.sleep(1)
    t2.cancel()
    await asyncio.sleep(2)


asyncio.run(main())

When t2 is cancelled, the outer await asyncio.shield(...) call is cancelled. The cancellation doesn't propagate through into has_bug because of the shielding, and the outer task still has a strong reference in the form of t2. But has_bug's task has no strong references to it; the only reference was in the local variables of the shield() functions. The next time the event loop ticks, gc.collect() is called, which drops the has_bug task entirely.
t2 被取消时,外层的 await asyncio.shield(...) 调用也会被取消。由于屏蔽机制的存在,取消操作不会传递到 has_bug 中,外层任务仍然以 t2 的形式拥有强引用。但是 has_bug 的任务没有对它的强引用;唯一的引用来自 shield() 函数的局部变量。下次事件循环执行时, 调用 gc.collect() ,彻底删除 has_bug 任务。

You might try to avoid this by doing create_task explicitly as this will keep a strong reference to the has_bug() task in the local variables of the cancelled generator coroutine for shield_task, like so:
您可以尝试通过明确执行 create_task 来避免这种情况,因为这将在 shield_task 的已取消生成器协程的局部变量中保留对 has_bug() 任务的强引用,如下所示:

async def shield_task():
    inner = asyncio.create_task(has_bug())
    await asyncio.shield(inner)

But this only works all the while the handle to t2 lives inside main(). If that handle gets dropped, then the inner has_bug will also get dropped! Adding a del t2 after the t2.cancel() will expose this immediately. Good luck tracking this through a web of classes and tasks.
但这只在 t2 的句柄还在里面时才有效 main() 。如果该句柄被丢弃,那么内部的 has_bug也会被丢弃!在 t2.cancel() 之后添加 del t2 可以立即发现这个问题。祝你好运,通过网上的课程和任务来追踪这个问题。

Major Problem #3: I/O has pointless landmines
主要问题 #3:I/O 存在无意义的陷阱

The underlying API for performing network I/O is the ever-venerable BSD socket API. Python exposes a nice object-based API for working with sockets; let's look at some code that opens a connection on a socket and sends some data.
执行网络 I/O 的底层 API 是久负盛名的 BSD 套接字 API。Python 公开了一个基于对象的 API 来处理套接字;让我们看一段在套接字上打开连接并发送数据的代码。

s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.connect(("2001:708:40:2001::11ba", 6667))
s.send(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
motd = s.recv(4096)
s.shutdown(socket.SHUT_RDWR)  # Graceful close, send an EOF
s.close()

This is pretty bare-bones, but it's easy to see how the code flows: top to bottom. It's a set of linear statements:
这很简单,但很容易看出代码的流程:从上到下。它是一组线性语句:

  1. Create the socket with the appropriate options.
    使用适当的选项创建套接字。
  2. Connect it to an address directly.
    直接将其连接到地址。
  3. Send some data from a socket.
    从套接字发送一些数据。
  4. Receive some data from the socket.
    从套接字接收一些数据。
  5. Shut it down and close the socket.
    将其关闭并关闭套接字。

This all happens in order; it's simple to follow. Trio offers an asynchronous version of this, so let's rewrite the code to be identical with Trio sockets:
这一切都按顺序进行;很容易理解。Trio 提供了一个异步版本,因此让我们将代码重写为与 Trio 套接字相同:

s = trio.socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
await s.connect(("2001:708:40:2001::11ba", 6667))
await s.send(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
motd = s.recv(4096)
s.shutdown(socket.SHUT_RDWR)  # Graceful close, schedules an EOF
s.close()

The code is almost identical, with some await statements introduced before every function that would normally block. Again, the control flow is simple; it flows from top to bottom in a linear fashion. Let's look at asyncio's version of sockets, which are called protocols:
代码几乎完全相同,只是在每个通常会阻塞的函数之前引入了一些 await 语句。同样,控制流很简单;它以线性方式从上到下流动。让我们看一下 asyncio 版本的套接字,它们被称为协议:

import asyncio


class IdentifyProtocol(asyncio.Protocol):
    def __init__(self, message: bytes, motd_future: asyncio.Future):
        self.message = message 
        self.motd = motd_future

    def connection_made(self, transport: asyncio.WriteTransport):
        transport.write(self.message.encode())

    def data_received(self, data: bytes):
        self.motd.set_result(data)

    def connection_lost(self, exc: BaseException):
        ...


fut = loop.create_future()
transport, protocol = await loop.create_connection(
    partial(EchoClientProtocol, b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n", fut),
    "2001:708:40:2001::11ba", 
    6667,
)

motd = await protocol.motd

Unlike regular BSD sockets or Trio's socket wrappers, asyncio uses callbacks - synchronous ones, at that - to implement the low-level I/O primitives. The control flow here jumps around a lot:
与常规 BSD 套接字或 Trio 的套接字包装器不同, asyncio 使用 回调 ——当然是同步回调——来实现底层 I/O 原语。这里的控制流跳转了很多:

  1. create_connection is equivalent to socket.socket + socket.connect. Okay. You don't get to set socket options (at least it sets TCP_NODELAY) and it doesn't work for anything other than regular AF_INET/AF_INET6 sockets.
    create_connection 相当于 socket.socket + socket.connect 。好的。你不需要设置套接字选项(至少它设置了 TCP_NODELAY ),并且它不适用于除常规之外的任何东西 AF_INET/AF_INET6 套接字。

    It returns a tuple of (write transport, protocol instance); the former can be used to send further data (synchronously).
    它返回一个 (write transport, protocol instance) 元组;前者可以 用于发送进一步的数据(同步)。

  2. When the socket is opened, it jumps into my class and calls (synchronously) connection_made, providing a "transport" which I can call the (synchronous) write method on to send my authentication method.
    当套接字打开时,它会跳入我的类并调用(同步) connection_made ,提供一个“传输”,我可以调用(同步) write 方法来发送我的身份验证方法。

    There's no way to wait for this to be sent as WriteTransport.write is synchronous. It'll get sent at some point in the future. Maybe. If you want to let somebody know that you've sent the message, you'll need to implement that yourself too.
    没有办法等待这个被发送,因为 WriteTransport.write 是 同步。它会在未来某个时间点发送。也许吧。如果你想让 有人知道您已发送消息,您也需要自己实现这一点。

  3. After some time, the server will respond; the event loop calls (synchronously) data_received, providing the received data. If you want to do something with this data (asynchronously), you need to pass it to the outside world yourself using futures or queues. In this case, I've implemented it with a regular Future; I haven't even thought about how to swap the future out in a non-error prone way for future reads yet.
    一段时间后,服务器将响应;事件循环调用(同步) data_received ,提供接收到的数据。如果您想(异步地)处理这些数据,则需要使用 Future 或队列将其传递给外部世界。在本例中,我使用常规的 Future 来实现它; 我甚至还没想过如何以一种不易出错的方式将未来换成 未来还会读。

  4. The outside world now reads the data from the future. That's three separate places I've had to deal with the data, versus a single place with a linear order for sockets.
    现在,外部世界会读取来自未来的数据。我之前必须在三个不同的地方处理这些数据,而不是在一个地方按套接字的顺序进行线性处理。

The biggest difference between raw sockets and protocols is that protocols have their incoming data pushed in to you. If you want to simply wait for data to arrive, you need to implement that yourself! This is only a basic protocol; more complex protocols require more implementing more complicated synchronisation mechanisms manually to communicate between the entirely synchronous protocol callbacks leading to a mess of either create_task everywhere or manually shuffling futures/events around.
原始套接字和协议之间最大的区别在于,协议会将其传入的数据推送给您。如果您只想等待数据到达,则需要 自己实现它!这只是一个基本协议;更复杂的协议需要更多 手动实现更复杂的同步机制,以便在 完全同步的协议回调会导致 到处 create_task 或手动调整未来/事件。

Why is it like this? Because Twisted was like this. But Twisted existed in a world before yield from or await, so it has an excuse. asyncio copied it in a world with yield from and await, so it has no excuse.
为什么会这样?因为 Twisted 就是这样的。但 Twisted 存在于另一个世界之前 yield fromawait ,所以它有一个借口。asyncio 在具有 yield fromawait 世界中复制了它,因此 asyncio 没有借口。

And no, the answer is also not "because Windows doesn't support a Unix-style select() API properly". If you want select() semantics on Windows, use \Device\Afd like everyone else does (and by everyone else, I mean the entire Javascript and Rust ecosystem).
不,答案也不是“因为 Windows 不支持 Unix 风格 select() API 正确”。如果您希望在 Windows 上使用 select() 语义,请使用 \Device\Afd 类似 其他人也这么做(而其他人也这么做,我 指的是整个 Javascript 和 Rust 生态系统)。

That's not fair  这不公平

That's true. It's rare that you'll actually interact with protocols; they are a weird implementation detail of asyncio's event loop mechanisms. The same goes for Trio sockets, but at least for sockets you can use them for esoteric mechanisms like AF_NETLINK or SOCK_RAW whilst still retaining the nice asynchronous API. (You can implement those socket types on asyncio with the even lower level APIs of add_{reader|writer}, but that's not a topic for today).
确实如此。你很少会真正与协议交互;它们是 asyncio 事件循环机制的一个奇怪的实现细节。Trio 也是如此。 套接字,但至少对于套接字,你可以将它们用于深奥的机制,例如 AF_NETLINKSOCK_RAW ,同时仍然保留良好的异步 API。(您可以使用 add_{reader|writer} 等更底层的 API 在 asyncio 上实现这些套接字类型,但这不是今天的主题)。

Instead most asyncio and Trio programs will use streams, a high-level generic API that treats network connections as nothing more than a stream of bytes. Here's how the previous socket example would be written using Trio's streams:
相反,大多数 asyncio 和 Trio 程序将使用 streams ,这是一种高级通用 API,它将网络连接视为字节流。以下是使用 Trio 的流编写上一个套接字示例的方法:

async with trio.open_tcp_stream("irc.libera.chat", port=6667) as stream:
    # type: trio.SocketStream
    await stream.send_all(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")

This is very simple; the returned stream works as an asynchronous context manager that automatically closes the socket when done, regardless of if the inner code succeeds or fails. The send_all method will automatically retry when the underlying socket returns a partial write, so the user doesn't need to implement retry logic for partial writes by hand.
这非常简单;返回的流充当异步上下文管理器,无论内部代码执行成功还是失败,完成后都会自动关闭套接字。当底层套接字返回部分写入时, send_all 方法会自动重试,因此用户无需手动实现部分写入的重试逻辑。

Here's how you do it in asyncio:
以下是在 asyncio 中执行的操作:

reader, writer = await asyncio.open_connection("irc.libera.chat", port=6667)
try:
    writer.write(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
    await writer.drain()
finally:
    writer.close()
    await writer.wait_closed()

This is similar to the Trio example with two major differences:
这与 Trio 示例类似,但有两个主要区别:

  • writer.write is synchronous and does not actually perform a full write unless drain() is called.
    writer.write同步的 ,除非调用 drain() 否则实际上不会执行完整的写入。
  • writer.close does not actually perform a close, only schedules it, and you need to use wait_closed to ensure the stream is closed.
    writer.close 实际上并不执行关闭,只是安排它,并且您需要使用 wait_closed 来确保流已关闭。

    Also, wait_closed will block if the drain method is cancelled. The cancellation issues are everywhere.
    此外,如果 drain 方法被取消, wait_closed 将会阻塞。 取消问题随处可见。

The write/drain pair exists entirely as a footgun for anyone who forgets to call drain. Data may get written in the background if you don't call drain(), but if you're in a tight loop with lots of data to write and no other await calls, it will buffer all of that data into the stream's internal buffer without sending it. Even if you do have the writer task rescheduled, the buffer may still fill up anyway if data is being written faster than the background writer can empty it. This is stupid!
write / drain 函数的存在完全是为了避免忘记调用 drain 函数。如果你没有调用,数据可能会在后台写入。 drain() ,但如果你处于一个需要写入大量数据的紧密循环中,并且没有其他 await 调用时,它会将所有数据缓冲到流的内部缓冲区中,而不会发送。即使你重新安排了写入任务,如果数据写入速度快于后台写入器清空缓冲区的速度,缓冲区仍然可能被填满。这太蠢了!

It's a not too dissimilar situation with close/wait_closed; close() schedules a close and wait_closed waits for that close to actually be sent. What happens if wait_closed is cancelled? asyncio doesn't really define the semantics for this, unlike the Trio world which very explicitly does. In the Trio world, all closeable objects follow the AsyncResource ABC, which defines an idempotent aclose method that must always succeed.
这与 close / wait_closed 情况不太相似; close() 会安排一次关闭操作,而 wait_closed 则会等待该关闭操作实际发送。如果 wait_closed 被取消了会发生什么? asyncio 并没有真正定义它的语义,不像 Trio 世界非常明确地做到了这一点。在 Trio 世界中,所有可关闭的对象都遵循 AsyncResource ABC,它定义了一个幂等的 aclose 方法,该方法必须始终成功

So what happens for protocols such as TLS that need a graceful goodbye message sent? Trio's SSL helpers will try and send a graceful close, and if that times out the stream will be severed by force instead. The end-user doesn't need to know anything about this; they can call aclose on a resource to close it and not worry about if it will be cancelled or if the resource is actually closed.
那么,像 TLS 这样的协议需要发送优雅的告别消息时会发生什么呢?Trio 的 SSL 助手将尝试发送一个优雅的结束,如果超时,流将被切断 强制执行。最终用户不需要知道任何有关此的内容;他们可以调用 对资源 aclose 关闭它,而不必担心它是否会被取消或者资源是否真的关闭。

Major Problem #4: asyncio.Queue is difficult to use
主要问题 #4: asyncio.Queue 难以使用

I have two tasks: a producer (that makes messages) and a consumer (that eats messages). Here they are:
我有两个任务:一个生产者(负责发送消息)和一个消费者(负责接收消息)。它们分别是:

async def producer():
    while True:
        message = await do_some_networking_thing()
        # i don't know how to send a message...

async def consumer():
    while True:
        message = # i don't know how to receive a message...
        await eat(message)

How do I get messages between them? I could use a Future, but that would only work exactly once and both of these functions are running in a loop. I could find a way to ferry Future instances between them, but if I could do that I would use the ferry to communicate the messages instead.
我如何在它们之间传递消息?我可以使用 Future ,但那只有效。 恰好一次,并且这两个函数都在循环运行。我可以找到一种方法来 他们之间 Future 例子,但如果可以的话,我会使用渡轮来传达信息。

The solution is an asyncio.Queue, which is the asynchronous version of queue.Queue (which is the Python version of java.util.concurrent.ArrayBlockingQueue). Let's pass a queue to both functions:
解决方案是 asyncio.Queue ,它是异步版本 queue.Queue (这是 Python 版本的 java.util.concurrent.ArrayBlockingQueue )。让我们将一个队列传递给这两个函数:

async def producer(queue: asyncio.Queue):
    while True:
        message = await do_some_networking_thing()
        await queue.put(message)

async def consumer(queue: asyncio.Queue):
    while True:
        message = await queue.get()
        await eat(message)

async def main():
    queue = asyncio.Queue()
    t1 = asyncio.create_task(producer(queue))
    t2 = asyncio.create_task(consumer(queue))

    while True:
        await asyncio.sleep(99999999)

asyncio.run(main())

This will have the producer loop forever creating items and putting them in the queue, and the consumer will loop forever reading items from thee queue and doing something with them. This is a very common pattern which is similar to communicating sequential processes. But what happens if consumer throws an exception in eat? Let's go over the control flow:
这将使生产者无限循环地创建项目并将其放入队列,而消费者也将无限循环地从队列中读取项目并执行相应的操作。这是一种非常常见的模式,类似于通信顺序进程 。但是,如果 consumereat 中抛出异常会发生什么?让我们回顾一下控制流:

  1. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  2. consumer receives an item and calls eat.
    consumer 收到一件商品并调用 eat
  3. eat raises an exception and the consumer task dies. For the sake of understanding, this exception is a transient external exception and is not related to either the code or the item being consumed.
    eat 会引发异常,消费者任务也会终止。为了 理解,此异常是暂时的外部异常,与 正在消费的代码或物品。
  4. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  5. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  6. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  7. Your system locks up as the out-of-memory killer fails to run.
    由于内存不足杀手无法运行,您的系统将被锁定。

This is because the consumer exerts no backpressure on the producer; the producer will gladly keep sending items into the queue forever that nobody is listening to. I can add some backpressure by using asyncio.Queue(maxsize=1), which changes the control flow like so:
这是因为消费者不会对生产者施加背压 ;生产者会很乐意地一直往队列里发送数据,即使没有人监听。我可以使用 asyncio.Queue(maxsize=1) 来增加一些背压,它会改变控制流,如下所示:

  1. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  2. consumer receives an item and calls eat.
    consumer 收到一件商品并调用 eat
  3. eat raises an exception and the consumer task dies.
    eat 引发异常并且消费者任务终止。
  4. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  5. producer produces an item, tries sending it to the queue, but blocks forever because there's nobody reading from the queue.
    producer 生产一个项目,尝试将其发送到队列,但永远阻塞 因为没有人在读取队列。

That's a little bit better in the sense it won't leak memory forever, but instead it will lock up forever because the producer has no way of knowing that the consumer isn't listening anymore. In Python 3.13 the Queue.shutdown method was added which lets one (or both) sides know that the queue is closed and can't accept (or receive) any new items. Let's adjust the code to use that:
这样稍微好一点,因为它不会永远泄漏内存,但会永远锁住,因为生产者无法知道消费者是否已经停止监听。在 Python 3.13 中,添加了 Queue.shutdown 方法,它可以让一方(或双方)知道队列已关闭,无法接受(或接收)任何新项目。让我们调整代码来使用这个方法:

If you're stuck on Python 3.12 or earlier, there's no Queue.shutdown available.
如果你还在使用 Python 3.12 或更早版本,那么就没有 Queue.shutdown 可用的。

async def consumer(queue: asyncio.Queue):
    while True:
        message = await queue.get()

        try:
            await eat(message)
        except:
            queue.shutdown()
            raise

Now the control flow goes as follows:
现在控制流程如下:

  1. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  2. consumer receives an item and calls eat.
    consumer 收到一件商品并调用 eat
  3. eat raises an exception and the consumer task dies.
    eat 引发异常并且消费者任务终止。
  4. producer produces an item and tries sending it to the queue, but fails because the queue is shut down.
    producer 生产一个项目并尝试将其发送到队列,但失败了,因为 队列已关闭。

Except... that's not true. There's a race condition going on between steps three and four; if producer puts an item into the queue before the consumer task is killed, then the item that was sent to the queue remains there forever. There's a pair of methods, join() and task_done that can solve this, meaning my code now looks like this:
但……事实并非如此。第三步和第四步之间存在竞争条件;如果 producer 在消费者任务被终止之前将一个项目放入队列, 那么发送到队列的项目将永远保留在那里。有两个方法: join()task_done 可以解决这个问题,这意味着我的代码现在看起来像这样:

async def producer(queue: asyncio.Queue):
    while True:
        message = await do_some_networking_thing()
        await queue.put(message)
        await queue.join()

async def consumer(queue: asyncio.Queue):
    while True:
        try:
            message = await queue.get()
            await eat(queue)
        except:
            queue.shutdown(immediate=True)
            raise
        else:
            queue.task_done()

And the control flow goes as follows:
控制流程如下:

  1. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  2. producer begins blocking until the consumer calls task_done.
    producer 开始阻塞,直到消费者调用 task_done
  3. consumer receives an item and calls eat.
    consumer 收到一件商品并调用 eat
  4. eat raises an exception and the consumer task dies. The queue is shut down.
    eat 抛出异常,消费者任务终止。队列被关闭。
  5. queue.join wakes up because I passed immediate=True. If I didn't pass that, it would block forever instead.
    queue.join 被唤醒,因为我传递了 immediate=True 。如果我 如果没有通过,它将永远阻塞。
  6. producer produces an item and tries sending it to the queue, but put fails because the queue is shut down.
    producer 生产了一个项目并尝试将其发送到队列,但是 由于队列已关闭,因此 put 失败。

This eliminates the race condition entirely. This isn't a very useful pattern because with one consumer and one producer it can be generalised into just calling the consumer function from the producer. It would be more useful if I add a second consumer, assuming consumers are slower than the producer:
这完全消除了竞争条件。这不是一个非常有用的模式,因为只有一个消费者和一个生产者,可以概括为只从生产者调用消费者函数。如果我添加第二个消费者,假设消费者比生产者慢,那么会更有用:

  1. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  2. producer begins blocking until the consumer calls task_done.
    producer 开始阻塞,直到消费者调用 task_done
  3. Consumer task 1 receives an item and calls eat.
    消费者任务 1 收到一个物品并调用 eat
  4. Consumer task 2 sits there idly because the producer can't do anything until the first consumer task has finished.
    消费者任务 2 处于空闲状态,因为生产者在第一个消费者任务完成之前无法执行任何操作。
  5. Consumer task 1 has an exception, and shuts down the queue.
    消费者任务1发生异常,并关闭队列。
  6. Consumer task 2 has an exception because the queue was closed.
    消费者任务 2 发生异常,因为队列已关闭。
  7. Everything explodes in a fiery mess of exceptions.
    一切都在异常的混乱中爆发。

To fix this, consumer task 1 won't shut down the queue but will restart itself, perhaps from an external supervisor.
为了解决这个问题,消费者任务 1 不会关闭队列,而是会自行重新启动,可能是通过外部监控器。

async def consumer(queue: asyncio.Queue):
    while True:
        try:
            message = await queue.get()
            await eat(queue)
        except Exception:
            logger.exception()
            return
        else:
            queue.task_done()

Let's look at the control flow for a final time:
让我们最后看一下控制流:

  1. producer produces an item and sends it to the queue.
    producer 生产一个项目并将其发送到队列。
  2. producer begins blocking until the consumer calls task_done.
    producer 开始阻塞,直到消费者调用 task_done
  3. Consumer task 1 receives an item and calls eat.
    消费者任务 1 收到一个物品并调用 eat
  4. Consumer task 2 sits there idly because the producer can't do anything until the first consumer task has finished.
    消费者任务 2 处于空闲状态,因为生产者在第一个消费者任务完成之前无法执行任何操作。
  5. Consumer task 1 has an exception, and returns.
    消费者任务1发生异常,返回。
  6. Consumer task 2 remains blocking on get.
    消费者任务 2 仍然阻塞在 get 上。
  7. producer continues blocking on join.
    producerjoin 时继续阻塞。
  8. The freshly rebooted consumer task 1 starts blocking on get
    刚刚重启的消费者任务 1 开始阻塞 get

This could be fixed by making the first consumer task try and re-insert an item on an exception, but what happens if the second task has had an error? Deadlocks. At this point, I give up and pull in an AMQP server instead of dealing with in-library queues.
这个问题可以通过让第一个消费者任务在发生异常时尝试重新插入数据来解决,但如果第二个任务也出错了怎么办?死锁。这时,我放弃了,转而使用 AMQP 服务器,而不是处理库内队列。

It doesn't have to be this way
不必如此

What I'm really looking for is a combination of the following:
我真正寻找的是以下内容的组合:

  • A queue that blocks until a receiver has retrieved the item (aka, automatic .join()).
    阻塞队列,直到接收者检索到项目(又称自动 .join() )。
  • A queue that can be cloned and independently closed without affecting other consumers.
    可以克隆独立关闭而不影响 其他消费者。

Trio's channels implement these behaviours. Let's re-write the consumer/producer pattern to use channels:
Trio 的通道实现了这些行为。让我们重写消费者/生产者模式来使用通道:

async def producer(channel: trio.MemorySendChannel[Message]):
    async with channel:
        while True:
            message: Message = await do_some_networking_thing()
            await channel.send(message)


async def consumer(channel: trio.MemoryReceiveChannel[Message]):
    async with channel:
        while True:
            result = await channel.receive()

            try:
                await do_something(result)
            except Exception:
                logger.exception()
                return
            

async def main():
    send, receive = trio.open_memory_channel[Message](max_buffer_size=0)

    async with trio.open_nursery() as n:
        for _ in range(5):
            consumer_channel = receive.clone()
            n.start_soon(partial(consumer, consumer_channel))

        n.start_soon(partial(producer, send))

Trio channels with a buffer size of zero act as transfer queues, a name coined by Java 7 (released in 2011 (!!)), where the sender always waits for a receiver to take a message from the channel. Each receiver gets its own unique clone of the channel that can be independently cloned and messages are sent from the sender channel in a round-robin fashion. These clones can be independently closed without affecting the other cloned channels; only once the final receive channel is closed will the sending channel begin raising errors. TransferQueue was created four solid years before asyncio existed. I really see no excuse for this behaviour to have existed when asyncio was being developed.
缓冲区大小为零的三通道充当传输队列 ,这个名字是由 Java 7 (2011 年发布!),发送者始终等待接收者从通道中获取消息。每个接收者都会获得一个唯一的通道克隆 ,该克隆可以 独立克隆并以循环方式从发送方通道发送消息。 这些克隆可以独立关闭,而不会影响其他克隆通道;只需一次 最后的接收通道关闭后,发送通道将开始引发错误。 TransferQueue 创建时间比 asyncio 早了整整四年。我实在找不到任何理由在 asyncio 开发时就存在这种行为。

The only problem this doesn't solve is that if the consumer has an error after receiving an object, that object stays unprocessed. This is a problem with both implementations and channels don't (yet) fix this; but there's nothing in the conceptual model that would prevent some form of RetryingChannel class that blocks the producer until an item is eventually processed. The same can't really be said of Queues, which will always buffer at least one item no matter what you do.
唯一无法解决的问题是,如果消费者在收到对象后发生错误,该对象将保持未处理状态。这个问题在实现上存在,而且通道(目前)尚未修复;但概念模型中没有任何内容可以阻止某种形式的 RetryingChannel 类,该类会阻塞生产者,直到有数据被处理为止。 最终会被处理。但队列的情况并非如此,它会 无论您做什么, 始终缓冲至少一个项目。

A more detailed look at all the issues with backpressure can be read in this post by the Trio creator.
可以阅读有关背压的所有问题的更详细的介绍 在 Trio 创建者的这篇文章中

Less Major Problems, a collection
《不太重要的问题》合集

Whilst those four areas are some of the worst parts of asyncio, there's a lot of minor warts that make it unpleasant to use everywhere else.
虽然这四个方面是 asyncio 最糟糕的部分,但是还有很多小缺陷使其在其他地方使用起来不愉快。

Threads are stupid  线程很愚蠢

It is an inevitability that asynchronous code needs to use threads for computationally intensive code or for libraries that still use blocking I/O. asyncio offers two APIs for this:
对于计算密集型代码或仍然使用阻塞 I/O 的库,异步代码不可避免地需要使用线程 asyncio 为此提供了两个 API:

  • asyncio.to_thread which propagates context variables correctly to worker threads but doesn't let you specify the concurrent.futures.ThreadPoolExecutor to use.
    asyncio.to_thread 将上下文变量正确传播给工作者 线程,但不允许您指定 concurrent.futures.ThreadPoolExecutor 使用。

  • loop.run_in_executor which doesn't propagate context variables but does let you specify the ThreadPoolExecutor to use; you need to wrap every function you're passing in a Context.run call.
    loop.run_in_executor 不会传播上下文变量,但允许您指定要使用的 ThreadPoolExecutor ;您需要将传递的每个函数包装在 Context.run 调用中。

This trade-off is very niche but it also doesn't really need to exist. The more important problem with threads comes from calling back into the event loop from a thread; cancellation does not propagate properly! Take this example:
这种权衡非常小众,但实际上也没有必要存在。线程更严重的问题来自于从线程回调到事件循环;取消操作无法正确传播!举个例子:

import asyncio
from functools import partial


async def coro():
    await asyncio.sleep(5)
    print("as if i would be cancelled!")


def in_thread(loop: asyncio.AbstractEventLoop):
    fut = asyncio.run_coroutine_threadsafe(coro(), loop)
    fut.result()


async def main():
    t = asyncio.create_task(asyncio.to_thread(partial(in_thread, asyncio.get_running_loop())))
    await asyncio.sleep(0)
    t.cancel()
    await asyncio.sleep(7)


asyncio.run(main())

Running this will print as if i would be cancelled! because cancelling the to_thread task will not cancel the synchronous task running on the event loop. Let's look at how Trio does it:
运行此程序将打印 as if i would be cancelled! 因为取消 to_thread 任务不会取消正在事件循环上运行的同步任务。让我们看看 Trio 是如何做到的:

from functools import partial

import trio
import trio.from_thread
import trio.to_thread


async def async_task():
    await trio.sleep(5)
    print("looks like I survived being cancelled")
    return 1


def sync_task():
    try:
        ret = trio.from_thread.run(async_task)
    except BaseException as e:
        print("raised", e)
    else:
        print("returned", ret)


async def main():
    async with trio.open_nursery() as group:
        group.start_soon(partial(trio.to_thread.run_sync, sync_task))
        await trio.sleep(1)
        group.cancel_scope.cancel()


trio.run(main)

Cancelling the outer cancel scope will cancel the inner task and this code will print raised Cancelled as the exception (correctly) propagates outwards into the sync_task function.
取消外部取消范围将取消内部任务,并且此代码将打印 由于异常(正确)向外传播, raised Cancelled sync_task 函数。

Other, minor problems  其他小问题

  • asyncio's Unix signal API consists entirely of loop.add_signal_handler, which takes a callback and schedules it on the event loop when a single signal is received; and loop.remove_signal_handler which rips out a handler for the specific signal manually. Compare this to Trio's open_signal_receiver API which lets you listen to multiple signals with one object, uses an asynchronous context manager to ensure that the handler is cleaned up, and is an iterator instead of a callback so the control flow is linear and easier to follow.
    asyncio 的 Unix 信号 API 完全由以下部分组成 loop.add_signal_handler ,当收到单个信号时,它会接受一个回调函数并将其调度到事件循环中;以及 loop.remove_signal_handler 它会手动提取特定信号的处理程序。与 Trio 的 open_signal_receiver API 允许您使用一个对象监听多个信号,使用异步 上下文管理器来确保处理程序被清理,并且是一个迭代器而不是 回调,因此控制流是线性的并且更容易遵循。

  • Eager tasks were a performance optimisation that was added where create_task forces a task to run up to the first suspension point, as opposed to lazy tasks where they will not run until the next tick of the event loop.
    Eager 任务是一种性能优化,它被添加到 create_task 强制任务运行到第一个暂停点,如下 与惰性任务相反,惰性任务直到事件循环的下一个周期才会运行。

    Unfortunately, they were broken on release (1, 2) when interacting with TaskGroup, and libraries often depend on the explicit semantics of lazy tasks that have existed up to the present day.
    不幸的是,它们在发布时( 1、2 TaskGroup 交互时被破坏了,并且库通常依赖于 至今仍存在的懒惰任务。

  • Speaking of TaskGroups, they are a mechanism to enforce structured concurrency in an asyncio world. But due to asyncio's lack of block-based cancellation - it only supports cancellation of single tasks - there's no way to cancel entire task groups. You have to cancel the task running the TaskGroup instead, which doesn't work if you only want to cancel a nested TaskGroup and not the root one.
    说到 TaskGroup ,它们是一种强制执行的机制 在 asyncio 世界中实现结构化并发 。但由于 asyncio 缺乏 基于块的取消 - 它仅支持取消单个任务 - 无法取消整个任务组。您必须取消正在运行的任务 TaskGroup ,如果您只想取消嵌套的 TaskGroup 而不是根 TaskGroup,则此方法不起作用。

    Trio does not have this issue because it has scope cancellation instead. Code running inside a CancelScope context manager can be cancelled independently, regardless of how nested it is inside a task, instead of needing the entire task to be cancelled at the very top level.
    Trio 没有这个问题,因为它有作用域取消功能。在 CancelScope 上下文管理器中运行的代码可以独立取消。 无论它在任务中嵌套如何,而不需要整个任务 将从最高层取消。

Conclusion  结论

asyncio is not a good library. It is constantly full of sharp edges everywhere with implementation details leaking and poorly designed APIs forcing end users into odd code patterns to avoid fundamental flaws in the interfaces.
asyncio 不是一个好的库。它总是到处都充斥着尖锐的问题,实现细节泄露,API 设计糟糕,迫使最终用户为了避免接口中的根本缺陷而使用奇怪的代码模式。

Trio fixes nearly every single issue in this post. AnyIO implements Trio-like semantics on top of asyncio, whilst still letting you use most parts of libraries designed for asyncio.
Trio 几乎修复了本文中的所有问题。AnyIO 在 asyncio 之上实现了类似 Trio 的语义,同时仍然允许您使用为 asyncio 设计的库的大部分功能。