One of the headliner features of Python 3.4 (released in 2014) was a new library in the standard library: asyncio, provisionally introduced for feedback as an import of the external tulip library. In Python 3.5 (released in 2015), async and await were added as keywords to the language specifically for usage with asynchronous libraries, replacing the usage of yield from. The asyncio module was also made non-provisional in this release, heralding an entire new ecosystem of asynchronous libraries in the Python world.
Python 3.4(2014 年发布)的头条特性之一是标准库中新增了一个库: asyncio ,它暂时作为外部 tulip 库的导入库引入,以响应反馈。在 Python 3.5(2015 年发布)中, async 和 await 被添加为关键字,专门用于异步库,取代了 yield from 的用法。asyncio 模块也在此版本中成为非临时库,预示着 Python 世界中一个全新的异步库生态系统 asyncio 到来。
But asyncio has so many sharp corners and design issues it is far too difficult to use, bordering on being fundamentally broken. Some of these were only realised in hindsight because other languages (Kotlin, Swift) and libraries did what asyncio does significantly better; but most of these issues were bad at release and it is baffling how the library made it out of provisional status with such glaring flaws.
但是 asyncio 存在太多缺陷和设计问题,使用起来非常困难,几乎可以说是彻底崩溃了。其中一些问题直到后来才意识到,因为其他语言(Kotlin、Swift)和库在 asyncio 功能上做得更好;但大多数问题在发布时就已经很严重了,令人费解的是,这个库是如何在存在如此明显缺陷的情况下摆脱临时状态的。
I mention the Trio library a lot in this post, but there's also the AnyIO library that implements Trio-like semantics on top of asyncio, fixing most of the issues described here whilst retaining a level of compatibility with regular asyncio libraries.
我在本文中多次提到 Trio 库,但还有 AnyIO 库在 asyncio 之上实现了类似 Trio 的语义,解决了这里描述的大多数问题,同时保留了与常规 asyncio 库的一定程度的兼容性。
asyncio.Queue is difficult to useasyncio.Queue 难以使用In the traditional model of concurrent programming using threads, there is no clean way to do cancellation. In the standard pthreads model, the only way to do cancellation is to brutally murder a thread using pthread_kill, which is nearly always a bad idea because anything the thread was using (such as locks) will be in an unknown and inconsistent state if the thread was killed in the middle of an operation. If you want a better mechanism, you need to implement it yourself by constantly polling a shared state object in-between doing work, using a loop like so:
在使用线程的传统并发编程模型中,没有简洁的取消方法。在标准的 pthreads 模型中,唯一的取消方法是使用 pthread_kill 粗暴地杀死一个线程,但这几乎总是一个坏主意,因为如果线程在操作过程中被杀死,该线程正在使用的任何内容(例如锁)都将处于未知且不一致的状态。如果您想要一个更好的机制,您需要自己实现它,方法是在执行工作之间不断轮询共享状态对象,使用如下循环:
cancelled = threading.Event()
def t1():
while not cancelled.is_set():
do_work()
def main():
threading.Thread(target=t1).start()
# ... do something inbetween
cancelled.set()
This is unergonomic and error-prone as only threads that opt-in to this cancellation mechanism can be cancelled and only when they explicitly check if they are cancelled. Some languages (i.e. Java) make this a bit better by having a Thread.interrupt() method that handles dealing with communicating the interrupted state, with most standard library functions such as Object.wait() automatically checking for the interrupted state. (This still falls victim to the other issues described here.)
这很不人性化,而且容易出错,因为只有启用此取消机制的线程才能被取消,而且只有在它们明确检查是否被取消的情况下才能被取消。有些语言(例如 Java)通过提供 Thread.interrupt() 方法来处理中断状态的通信,从而稍微改善了这一点,大多数标准库函数(例如 Object.wait() 都会自动检查中断状态。(但这仍然受到本文所述其他问题的困扰。)
asyncio is an asynchronous runtime and is responsible for its own scheduling of its own tasks, instead of the kernel. When an asyncio task needs to interact with the system, it asks the event loop to suspend it until an operation is complete whereupon the task will be rescheduled and will run again in the next tick of the event loop. Threads do the same, but with system calls, and the user application has no control over the kernel's scheduler beyond tweaking some tuning parameters.asyncio 是一个异步运行时,它负责自身任务的调度,而不是内核的调度。当 asyncio 任务需要与系统交互时,它会请求事件循环将其暂停,直到某个操作完成,然后该任务将被重新调度,并在事件循环的下一个 tick 中再次运行。线程也会执行相同的操作,但需要使用系统调用,并且用户应用程序除了调整一些调优参数外,无法控制内核的调度程序。
This scheduling mechanism is reused to implement cancellation. When a task is cancelled, any pending operation that the event loop was performing for a task is cancelled, and instead the call raises a CancelledError. Unlike threads tasks no longer need to check if they have been cancelled; every single time a call drops into the event loop the runtime itself checks for cancellation. Conceptually, you can imagine every await as a cancellation point:
此调度机制可重用于实现取消。当任务被取消时,事件循环中正在为该任务执行的任何待处理操作都将被取消,并且调用会引发 CancelledError 。与线程不同,任务不再需要检查它们是否已被取消;每次调用进入事件循环时,运行时都会自行检查是否被取消。从概念上讲,你可以将每个 await 想象成一个取消点 :
async def something(stream: SomeLibraryStream):
while True:
result = await stream.read() # Cancellation point
parsed = do_parse(result) # *not* a cancellation point
From this, we can derive a conceptual model of how tasks and cancellations interact:
由此,我们可以得出任务和取消如何相互作用的概念模型:
await, at which point they suspend.await ,此时它们会暂停。task.cancel(), which reschedules the task again.task.cancel() ,重新安排任务。await-ed now raises Cancelled.await 函数现在引发了 Cancelled 。This avoids both problems with threads: tasks can be externally killed without worrying about resources not being torn down, and end-user tasks don't need to constantly check if they've been cancelled because the event loop does it for you.
这避免了线程的两个问题:可以从外部终止任务而不必担心资源不会被拆除,并且最终用户任务不需要不断检查它们是否已被取消,因为事件循环会为您完成此操作。
Consider this function below that returns a resource wrapped in an asynchronous context manager. When the user is done, it needs to clean up some resources (say, a server needs a clean close). This cleanup should be done regardless of if the code running inside the context manager was successful or not, so it's ran inside a finally block:
考虑下面这个函数,它返回一个封装在异步上下文管理器中的资源。当用户完成操作后,它需要清理一些资源(比如,服务器需要彻底关闭)。无论上下文管理器中的代码运行是否成功,都应该进行此清理,因此它在 finally 块中运行:
@asynccontextmanager
async def connect_to_server(ip: str, *, port: int = 6767) -> AsyncIterator[Sock]:
sock = await connect_socket(ip, port)
async with sock:
await sock.send(b"IDENTIFY ident :bar\r\nNICKNAME :gquuuuuux)\r\n")
try:
yield sock
finally:
await sock.send(b"QUIT :died to some small fry")
In this case, let's say .send() waits for some form of acknowledgement message. There's also another task that is spawned somewhere, and it's sending a PING message to the server every few seconds and expecting a PONG message. If the client goes too long without receiving a PONG, it cancels the task inside the context manager and exits itself.
在这种情况下,假设 .send() 等待某种形式的确认消息。同时在某个地方还创建了另一个任务,它每隔几秒向服务器发送一条 PING 消息,并期待收到一条 PONG 消息。如果客户端长时间没有收到 PONG 消息,它会在上下文管理器中取消该任务并退出。
What happens if the server does stop responding, and the task is cancelled? Let's see:
如果服务器停止响应,任务被取消,会发生什么?让我们看看:
CancelledException bubbling upwards.CancelledException 。yield sock expression raises a CancelledException, and control flows into the finally block.yield sock 表达式引发 CancelledException ,并且控制流进入 finally 块。sock.send() function, which re-enters the event loop. The event loop completely forgets that the task was cancelled and is entirely happy to deadlock the application forever waiting for the server to respond to the .send() (which will never happen).sock.send() 函数,该函数重新进入事件循环。事件循环完全忘记了任务已被取消,并乐意让应用程序永远陷入死锁,等待服务器响应 .send() 函数(这永远不会发生)。This is because cancellations in asyncio are edge-triggered, not level-triggered. These concepts are mostly used in the world of electronics, but are also applicable to certain types of programming too; an edge-triggered event only fires once when the state changes. In this case, it's Task.cancel() firing a cancellation error exactly once. This is the opposite behaviour to level-triggered cancellations, where cancelling a task will cause *all* calls to the event loop to raise a CancelledException, forever.
这是因为 asyncio 中的取消是边缘触发的 ,而不是电平触发的 。这些概念主要用于电子领域,但也适用于某些类型的编程; 边缘触发事件仅在状态更改时触发一次 。在本例中, Task.cancel() 只会触发一次取消错误。这与电平触发的取消行为相反,在电平触发的取消中,取消任务将导致所有事件循环调用永远抛出 CancelledException 异常。
Here's a more practical example that you can run directly on your computer to see this behaviour.
这是一个更实际的示例,您可以直接在计算机上运行它来查看此行为。
import asyncio
event = asyncio.Event()
async def fn():
try:
event.set()
await asyncio.sleep(60)
finally:
await asyncio.sleep(5)
print("slept for 5s")
async def main():
task = asyncio.create_task(fn())
await event.wait()
task.cancel()
await asyncio.sleep(10)
asyncio.run(main())
When you run this, the first sleep(60) will be cancelled, and then the program will sleep for five more seconds before printing a slept for 5s message because the cancellation disappeared.
当您运行此命令时,第一个 sleep(60) 将被取消,然后程序将再休眠五秒钟,然后打印 slept for 5s 消息,因为取消操作消失了。
This is absolutely 100% the wrong behaviour and it makes cancellations dangerous when it can be swallowed or covered up at any point.
这绝对是 100% 错误的行为,并且当这种取消行为可能在任何时候被接受或掩盖时,它就会变得很危险。
except:? Swallows cancellations. People will lie and say that they don't write these, but people do use bare excepts. Even if you don't, do you know that every other library doesn't?except: ? 会忽略取消操作。有人会撒谎说他们没写过这种代码,但人们确实会用裸 excepts 。就算你没用,你知道其他库也一样吗?__aexit__? Can deadlock waiting for something that will never happen, swallowing the cancellation.__aexit__ 中进行清理?等待一些永远不会发生的事情可能会导致死锁,从而吞噬取消操作。try/finally? See above.try/finally 中进行清理?参见上文。Graceful asynchronous cleanup is intrinsically a difficult problem; if an operation blocks for too long, what do you do? If you adopt a rigid rule of always trying to be graceful you risk running into deadlocks if the operation never returns. If you simply avoid doing anything gracefully and just sever connections and open files with a machete you can end up with half-written data or some very unhappy servers on the other end. It doesn't really matter in the asyncio world, because the library doesn't give you any tools to implement this.
优雅地进行异步清理本质上是一个难题;如果一个操作阻塞太久,你该怎么办?如果你墨守成规, 总是试图保持优雅,那么如果操作永远不返回,就有可能陷入死锁。如果你完全不优雅地处理任何事情,只是断掉连接,然后像砍刀一样打开文件,最终可能会导致数据写入一半,或者另一端的服务器非常不爽。这在 asyncio 世界里并不重要,因为这个库没有提供任何工具来实现这一点。
The Trio library takes the opposite approach; all cancellations are level-triggered. Let's port the sleeping example above to use Trio instead:
Trio 库采用了相反的方法;所有取消操作都是级别触发的 。让我们将上面的睡眠示例移植到 Trio 中:
import trio
event = trio.Event()
async def task():
try:
event.set()
await trio.sleep(60)
finally:
await trio.sleep(5)
print("slept for 5s")
async def main():
async with trio.open_nursery() as n:
n.start_soon(task)
await event.wait()
n.cancel_scope.cancel()
await trio.sleep(10) # Not needed, but for parity with the previous example.
trio.run(main)
Running this will produce... no output. It won't wait either, because anything that could wait has been cancelled. If you add a print() between the event.wait and the cancel_scope.cancel(), that will print something too, so it's not exiting early because it's not running anything.
运行此代码不会产生任何输出。它也不会等待,因为所有可以等待的操作都已被取消。如果在 event.wait 和 cancel_scope.cancel() 之间添加 print() ,它也会打印一些内容,所以它不会因为没有运行任何操作而提前退出。
This then asks a question: How do you do graceful cleanup? With shielded cancel scopes and timeouts. I'll replace the finally block above with one of those:
这就引出了一个问题:如何优雅地进行清理?使用屏蔽取消范围和超时。我将上面的 finally 块替换为以下代码之一:
finally:
with trio.move_on_after(1, shield=True):
await trio.sleep(5)
print("slept for 1s?")
await trio.sleep(5)
print("slept for 5s?")
Running this will print slept for 1s?, but nothing more. The code running inside the context manager ignored the outside cancellation, but was re-cancelled after a second anyway. This once again nets you the best of both worlds: cancellations aren't swallowed unless you explicitly opt-in. Remember the Zen of Python: Explicit is better than implicit.
运行此代码会打印 slept for 1s? ,但仅此而已。上下文管理器内部运行的代码忽略了外部取消,但一秒钟后又被取消了。这再次为您带来了两全其美的效果:除非您明确选择加入,否则取消不会被吞噬。记住 Python 之禅:显式优于隐式。
If you've ever used an asyncio application, you've probably seen that message pop up before. As an example, if I Ctrl-C portage too quickly, it spits out a few of those errors. Why? Because asyncio does not keep strong references to tasks. Quoting the official documentation:
如果你曾经使用过 Asyncio 应用,你可能之前就见过弹出这样的错误信息。例如,如果我 Ctrl-C 快捷键太快,就会弹出一些类似的错误信息。为什么?因为 asyncio 不会保留对任务的强引用。官方文档是这样描述的:
Important
重要的Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done. For reliable “fire-and-forget” background tasks, gather them in a collection:
保存对此函数结果的引用,以避免任务在执行过程中消失。事件循环仅保留对任务的弱引用。未在其他地方引用的任务可能随时被垃圾回收,甚至在完成之前。为了实现可靠的“即发即弃”后台任务,请将它们收集到一个集合中:
Let's take some example code:
让我们来看一些示例代码:
import asyncio, gc
async def expose_bugs():
while True:
await asyncio.sleep(0.5)
# Simulate doing work that would have the GC fire.
gc.collect()
async def has_bug():
loop = asyncio.get_running_loop()
fut = loop.create_future()
await fut
async def main():
t = asyncio.create_task(expose_bugs())
asyncio.create_task(has_bug())
await asyncio.sleep(5)
asyncio.run(main())
If you run this, it will print a warning to stderr about how has_bug was destroyed when it was pending. has_bug has no strong references to it, so when the GC runs the weak reference the event loop holds is removed and the task is dropped on the floor. Goodbye, has_bug.
如果你运行这个程序,它会向 stderr 打印一条警告,说明 has_bug 在挂起时是如何被销毁的。has_bug 没有对它 has_bug 强引用,所以当 GC 运行时,事件循环持有的弱引用会被移除,任务也会被丢弃。再见了, has_bug 。
This is very obviously insane behaviour, but it can somewhat be avoided by always holding references to spawned tasks (similarly to how you can avoid segmentation faults by always doing bounds checking). But it gets worse. There's a set of helper functions that are used for corralling tasks around: wait_for, gather, or shield; these can all cause a function being waited on to be dropped on the floor because they internally spawn said function as a task and wait on that instead:
这显然是一种疯狂的行为,但可以通过始终持有对已生成任务的引用来避免(类似于通过始终进行边界检查来避免段错误)。但情况会变得更糟。有一组辅助函数可用于围捕任务: wait_for 、 gather 或 shield ;这些函数都可能导致正在等待的函数被丢弃,因为它们在内部将该函数生成为一个任务并等待该任务:
import asyncio, gc
async def expose_bugs():
while True:
await asyncio.sleep(0.5)
# Simulate doing work that would have the GC fire.
gc.collect()
async def has_bug():
loop = asyncio.get_running_loop()
fut = loop.create_future()
await fut
async def shield_task():
await asyncio.shield(has_bug())
async def main():
t1 = asyncio.create_task(expose_bugs())
t2 = asyncio.create_task(shield_task())
# scheduling pass
await asyncio.sleep(1)
t2.cancel()
await asyncio.sleep(2)
asyncio.run(main())
When t2 is cancelled, the outer await asyncio.shield(...) call is cancelled. The cancellation doesn't propagate through into has_bug because of the shielding, and the outer task still has a strong reference in the form of t2. But has_bug's task has no strong references to it; the only reference was in the local variables of the shield() functions. The next time the event loop ticks, gc.collect() is called, which drops the has_bug task entirely.
当 t2 被取消时,外层的 await asyncio.shield(...) 调用也会被取消。由于屏蔽机制的存在,取消操作不会传递到 has_bug 中,外层任务仍然以 t2 的形式拥有强引用。但是 has_bug 的任务并没有对它的强引用;唯一的引用存在于 shield() 函数的局部变量中。下次事件循环触发时,会调用 gc.collect() ,从而彻底删除 has_bug 任务。
You might try to avoid this by doing create_task explicitly as this will keep a strong reference to the has_bug() task in the local variables of the cancelled generator coroutine for shield_task, like so:
您可以尝试通过明确执行 create_task 来避免这种情况,因为这将在 shield_task 的已取消生成器协程的局部变量中保留对 has_bug() 任务的强引用,如下所示:
async def shield_task():
inner = asyncio.create_task(has_bug())
await asyncio.shield(inner)
But this only works all the while the handle to t2 lives inside main(). If that handle gets dropped, then the inner has_bug will also get dropped! Adding a del t2 after the t2.cancel() will expose this immediately. Good luck tracking this through a web of classes and tasks.
但这只有在 t2 的句柄存在于 main() 中时才有效。如果该句柄被丢弃,那么内部的 has_bug 也会被丢弃!在 t2.cancel() 之后添加 del t2 可以立即暴露这一点。祝你好运,通过一系列类和任务来追踪这个问题。
The underlying API for performing network I/O is the ever-venerable BSD socket API. Python exposes a nice object-based API for working with sockets; let's look at some code that opens a connection on a socket and sends some data.
执行网络 I/O 的底层 API 是久负盛名的 BSD 套接字 API。Python 公开了一个基于对象的 API 来处理套接字;让我们看一段在套接字上打开连接并发送数据的代码。
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.connect(("2001:708:40:2001::11ba", 6667))
s.send(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
motd = s.recv(4096)
s.shutdown(socket.SHUT_RDWR) # Graceful close, send an EOF
s.close()
This is pretty bare-bones, but it's easy to see how the code flows: top to bottom. It's a set of linear statements:
这很简单,但很容易看出代码的流程:从上到下。它是一组线性语句:
This all happens in order; it's simple to follow. Trio offers an asynchronous version of this, so let's rewrite the code to be identical with Trio sockets:
这一切都按顺序进行;很容易理解。Trio 提供了一个异步版本,因此让我们将代码重写为与 Trio 套接字相同:
s = trio.socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
await s.connect(("2001:708:40:2001::11ba", 6667))
await s.send(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
motd = s.recv(4096)
s.shutdown(socket.SHUT_RDWR) # Graceful close, schedules an EOF
s.close()
The code is almost identical, with some await statements introduced before every function that would normally block. Again, the control flow is simple; it flows from top to bottom in a linear fashion. Let's look at asyncio's version of sockets, which are called protocols:
代码几乎完全相同,只是在每个通常会阻塞的函数之前引入了一些 await 语句。同样,控制流很简单;它以线性方式从上到下流动。让我们看一下 asyncio 版本的套接字,它们被称为协议:
import asyncio
class IdentifyProtocol(asyncio.Protocol):
def __init__(self, message: bytes, motd_future: asyncio.Future):
self.message = message
self.motd = motd_future
def connection_made(self, transport: asyncio.WriteTransport):
transport.write(self.message.encode())
def data_received(self, data: bytes):
self.motd.set_result(data)
def connection_lost(self, exc: BaseException):
...
fut = loop.create_future()
transport, protocol = await loop.create_connection(
partial(EchoClientProtocol, b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n", fut),
"2001:708:40:2001::11ba",
6667,
)
motd = await protocol.motd
Unlike regular BSD sockets or Trio's socket wrappers, asyncio uses callbacks - synchronous ones, at that - to implement the low-level I/O primitives. The control flow here jumps around a lot:
与常规 BSD 套接字或 Trio 的套接字包装器不同, asyncio 使用回调 (即同步回调)来实现低级 I/O 原语。这里控制流跳跃性很强:
create_connection is equivalent to socket.socket + socket.connect. Okay. You don't get to set socket options (at least it sets TCP_NODELAY) and it doesn't work for anything other than regular AF_INET/AF_INET6 sockets.create_connection 相当于 socket.socket + socket.connect 。好吧。你不需要设置套接字选项(至少它设置了 TCP_NODELAY ),而且它不适用于除常规 AF_INET/AF_INET6 套接字之外的任何套接字。
It returns a tuple of (write transport, protocol instance); the former can be used to send further data (synchronously).
它返回一个 (write transport, protocol instance) 元组;前者可用于(同步)发送更多数据。
When the socket is opened, it jumps into my class and calls (synchronously) connection_made, providing a "transport" which I can call the (synchronous) write method on to send my authentication method.
当套接字打开时,它会跳入我的类并调用(同步) connection_made ,提供一个“传输”,我可以调用(同步) write 方法来发送我的身份验证方法。
There's no way to wait for this to be sent as WriteTransport.write is synchronous. It'll get sent at some point in the future. Maybe. If you want to let somebody know that you've sent the message, you'll need to implement that yourself too.
由于 WriteTransport.write 是同步的,所以无法等待消息发送。它会在未来的某个时间点发送。也许吧。如果你想让别人知道你已经发送了消息,你也需要自己实现。
After some time, the server will respond; the event loop calls (synchronously) data_received, providing the received data. If you want to do something with this data (asynchronously), you need to pass it to the outside world yourself using futures or queues. In this case, I've implemented it with a regular Future; I haven't even thought about how to swap the future out in a non-error prone way for future reads yet.
一段时间后,服务器会响应;事件循环会(同步)调用 data_received ,提供接收到的数据。如果你想(异步)处理这些数据,则需要使用 Future 或队列将其传递给外部世界。在本例中,我使用常规的 Future 来实现它;我甚至还没有想过如何以一种不易出错的方式将 Future 替换为 Future 读取。
The outside world now reads the data from the future. That's three separate places I've had to deal with the data, versus a single place with a linear order for sockets.
现在,外部世界会读取来自未来的数据。我得在三个不同的地方处理这些数据,而不是在一个地方按套接字的顺序进行线性处理。
The biggest difference between raw sockets and protocols is that protocols have their incoming data pushed in to you. If you want to simply wait for data to arrive, you need to implement that yourself! This is only a basic protocol; more complex protocols require more implementing more complicated synchronisation mechanisms manually to communicate between the entirely synchronous protocol callbacks leading to a mess of either create_task everywhere or manually shuffling futures/events around.
原始套接字和协议之间最大的区别在于,协议会将传入的数据推送给你。如果你只想等待数据到达,你需要自己实现!这只是一个基本协议;更复杂的协议需要手动实现更复杂的同步机制,以便在完全同步的协议回调之间进行通信,这会导致要么到处都需要 create_task ,要么手动调整 Future/Events 的顺序。
Why is it like this? Because Twisted was like this. But Twisted existed in a world before yield from or await, so it has an excuse. asyncio copied it in a world with yield from and await, so it has no excuse.
为什么会这样?因为 Twisted 就是这样的。但是 Twisted 存在于 yield from 或 await 之前的世界中,所以它有借口。 asyncio 将它复制到了有 yield from 和 await 世界中,所以它没有借口。
And no, the answer is also not "because Windows doesn't support a Unix-style select() API properly". If you want select() semantics on Windows, use \Device\Afd like everyone else does (and by everyone else, I mean the entire Javascript and Rust ecosystem).
不,答案也不是“因为 Windows 无法正确支持 Unix 风格的 select() API”。如果你想在 Windows 上 select() 语义,请像其他人一样使用 \Device\Afd (我这里说的“其他人”指的是整个 JavaScript 和 Rust 生态系统)。
That's true. It's rare that you'll actually interact with protocols; they are a weird implementation detail of asyncio's event loop mechanisms. The same goes for Trio sockets, but at least for sockets you can use them for esoteric mechanisms like AF_NETLINK or SOCK_RAW whilst still retaining the nice asynchronous API. (You can implement those socket types on asyncio with the even lower level APIs of add_{reader|writer}, but that's not a topic for today).
确实如此。你很少会真正与协议交互;它们是 asyncio 事件循环机制中一个奇怪的实现细节。Trio 套接字也是如此,但至少对于套接字,你可以将它们用于 AF_NETLINK 或 SOCK_RAW 等深奥的机制,同时仍然保留良好的异步 API。(你可以在 asyncio 上使用更底层的 add_{reader|writer} API 来实现这些套接字类型,但这不是今天的主题)。
Instead most asyncio and Trio programs will use streams, a high-level generic API that treats network connections as nothing more than a stream of bytes. Here's how the previous socket example would be written using Trio's streams:
相反,大多数 asyncio 和 Trio 程序将使用 streams ,这是一种高级通用 API,它将网络连接视为字节流。以下是使用 Trio 的流编写上一个套接字示例的方法:
async with trio.open_tcp_stream("irc.libera.chat", port=6667) as stream:
# type: trio.SocketStream
await stream.send_all(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
This is very simple; the returned stream works as an asynchronous context manager that automatically closes the socket when done, regardless of if the inner code succeeds or fails. The send_all method will automatically retry when the underlying socket returns a partial write, so the user doesn't need to implement retry logic for partial writes by hand.
这非常简单;返回的流充当异步上下文管理器,无论内部代码执行成功还是失败,完成后都会自动关闭套接字。当底层套接字返回部分写入时, send_all 方法会自动重试,因此用户无需手动实现部分写入的重试逻辑。
Here's how you do it in asyncio:
以下是在 asyncio 中执行的操作:
reader, writer = await asyncio.open_connection("irc.libera.chat", port=6667)
try:
writer.write(b"USER abc 0 0 :def\r\nNICK :gquuuuuux\r\n")
await writer.drain()
finally:
writer.close()
await writer.wait_closed()
This is similar to the Trio example with two major differences:
这与 Trio 示例类似,但有两个主要区别:
writer.write is synchronous and does not actually perform a full write unless drain() is called.writer.write 是同步的 ,除非调用 drain() 否则实际上不会执行完整的写入。
writer.close does not actually perform a close, only schedules it, and you need to use wait_closed to ensure the stream is closed.writer.close 实际上并不执行关闭,只是安排它,并且您需要使用 wait_closed 来确保流已关闭。
Also, wait_closed will block if the drain method is cancelled. The cancellation issues are everywhere.
另外,如果 drain 方法被取消, wait_closed 也会阻塞。取消问题随处可见。
The write/drain pair exists entirely as a footgun for anyone who forgets to call drain. Data may get written in the background if you don't call drain(), but if you're in a tight loop with lots of data to write and no other await calls, it will buffer all of that data into the stream's internal buffer without sending it. Even if you do have the writer task rescheduled, the buffer may still fill up anyway if data is being written faster than the background writer can empty it. This is stupid!write / drain 这两个函数的存在完全是为了给忘记调用 drain 的人留后路。如果不调用 drain() 函数,数据可能会在后台写入,但如果你正处于一个紧凑的循环中,有大量数据需要写入,且没有其他 await 调用,它会将所有数据缓冲到流的内部缓冲区中,而不会发送出去。即使你重新安排了写入任务,如果数据写入速度快于后台写入速度,缓冲区仍然可能被填满。这太愚蠢了!
It's a not too dissimilar situation with close/wait_closed; close() schedules a close and wait_closed waits for that close to actually be sent. What happens if wait_closed is cancelled? asyncio doesn't really define the semantics for this, unlike the Trio world which very explicitly does. In the Trio world, all closeable objects follow the AsyncResource ABC, which defines an idempotent aclose method that must always succeed.
这与 close / wait_closed 情况并无太大区别; close() 安排关闭操作,而 wait_closed 等待实际发送关闭操作。如果 wait_closed 被取消会发生什么? asyncio 并没有真正定义这方面的语义,而 Trio 世界则非常明确地定义了这一点。在 Trio 世界中,所有可关闭对象都遵循 AsyncResource ABC,它定义了一个幂等的 aclose 方法,该方法必须始终成功 。
So what happens for protocols such as TLS that need a graceful goodbye message sent? Trio's SSL helpers will try and send a graceful close, and if that times out the stream will be severed by force instead. The end-user doesn't need to know anything about this; they can call aclose on a resource to close it and not worry about if it will be cancelled or if the resource is actually closed.
那么,对于像 TLS 这样需要发送优雅关闭消息的协议,会发生什么情况呢?Trio 的 SSL 助手会尝试发送优雅关闭消息,如果超时,流将被强制切断。最终用户无需了解任何相关信息;他们可以在资源上调用 aclose 来关闭它,而不必担心关闭是否会被取消或资源是否真的关闭了。
asyncio.Queue is difficult to useasyncio.Queue 难以使用I have two tasks: a producer (that makes messages) and a consumer (that eats messages). Here they are:
我有两个任务:一个生产者(负责发送消息)和一个消费者(负责接收消息)。它们分别是:
async def producer():
while True:
message = await do_some_networking_thing()
# i don't know how to send a message...
async def consumer():
while True:
message = # i don't know how to receive a message...
await eat(message)
How do I get messages between them? I could use a Future, but that would only work exactly once and both of these functions are running in a loop. I could find a way to ferry Future instances between them, but if I could do that I would use the ferry to communicate the messages instead.
我如何在它们之间传递消息?我可以使用 Future ,但那样只能运行一次,而且这两个函数都在循环运行。我可以找到一种方法来在它们之间传递 Future 实例,但如果可以的话,我宁愿使用 Ferry 来传递消息。
The solution is an asyncio.Queue, which is the asynchronous version of queue.Queue (which is the Python version of java.util.concurrent.ArrayBlockingQueue). Let's pass a queue to both functions:
解决方案是 asyncio.Queue ,它是 queue.Queue (即 java.util.concurrent.ArrayBlockingQueue 的 Python 版本)的异步版本。让我们将一个队列传递给这两个函数:
async def producer(queue: asyncio.Queue):
while True:
message = await do_some_networking_thing()
await queue.put(message)
async def consumer(queue: asyncio.Queue):
while True:
message = await queue.get()
await eat(message)
async def main():
queue = asyncio.Queue()
t1 = asyncio.create_task(producer(queue))
t2 = asyncio.create_task(consumer(queue))
while True:
await asyncio.sleep(99999999)
asyncio.run(main())
This will have the producer loop forever creating items and putting them in the queue, and the consumer will loop forever reading items from thee queue and doing something with them. This is a very common pattern which is similar to communicating sequential processes. But what happens if consumer throws an exception in eat? Let's go over the control flow:
这将使生产者无限循环地创建项目并将其放入队列,而消费者也将无限循环地从队列中读取项目并执行相应的操作。这是一种非常常见的模式,类似于通信顺序进程 。但是,如果 consumer 在 eat 中抛出异常会发生什么?让我们回顾一下控制流:
producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。consumer receives an item and calls eat.consumer 收到一件商品并调用 eat 。eat raises an exception and the consumer task dies. For the sake of understanding, this exception is a transient external exception and is not related to either the code or the item being consumed.eat 抛出异常,消费者任务终止。为了便于理解,此异常为瞬时外部异常,与代码或被消费的项目均无关。producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。This is because the consumer exerts no backpressure on the producer; the producer will gladly keep sending items into the queue forever that nobody is listening to. I can add some backpressure by using asyncio.Queue(maxsize=1), which changes the control flow like so:
这是因为消费者不会对生产者施加背压 ;生产者会很乐意地一直往队列里发送数据,即使没有人监听。我可以使用 asyncio.Queue(maxsize=1) 来增加一些背压,它会改变控制流,如下所示:
producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。consumer receives an item and calls eat.consumer 收到一件商品并调用 eat 。eat raises an exception and the consumer task dies.eat 引发异常并且消费者任务终止。producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。producer produces an item, tries sending it to the queue, but blocks forever because there's nobody reading from the queue.producer 生产一个项目,尝试将其发送到队列,但由于没有人从队列读取而永远阻塞。That's a little bit better in the sense it won't leak memory forever, but instead it will lock up forever because the producer has no way of knowing that the consumer isn't listening anymore. In Python 3.13 the Queue.shutdown method was added which lets one (or both) sides know that the queue is closed and can't accept (or receive) any new items. Let's adjust the code to use that:
这样稍微好一点,因为它不会永远泄漏内存,但会永远锁住,因为生产者无法知道消费者是否已经停止监听。在 Python 3.13 中,添加了 Queue.shutdown 方法,它可以让一方(或双方)知道队列已关闭,无法接受(或接收)任何新项目。让我们调整代码来使用这个方法:
If you're stuck on Python 3.12 or earlier, there's no Queue.shutdown available.
如果您使用的是 Python 3.12 或更早版本,则没有可用的 Queue.shutdown 。
async def consumer(queue: asyncio.Queue):
while True:
message = await queue.get()
try:
await eat(message)
except:
queue.shutdown()
raise
Now the control flow goes as follows:
现在控制流程如下:
producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。consumer receives an item and calls eat.consumer 收到一件商品并调用 eat 。eat raises an exception and the consumer task dies.eat 引发异常并且消费者任务终止。producer produces an item and tries sending it to the queue, but fails because the queue is shut down.producer 生产一个项目并尝试将其发送到队列,但由于队列已关闭而失败。Except... that's not true. There's a race condition going on between steps three and four; if producer puts an item into the queue before the consumer task is killed, then the item that was sent to the queue remains there forever. There's a pair of methods, join() and task_done that can solve this, meaning my code now looks like this:
但……事实并非如此。第三步和第四步之间存在竞争条件;如果 producer 在消费者任务被终止之前将一个项目放入队列,那么发送到队列的项目将永远保留在那里。有一对方法 join() 和 task_done 可以解决这个问题,这意味着我的代码现在看起来像这样:
async def producer(queue: asyncio.Queue):
while True:
message = await do_some_networking_thing()
await queue.put(message)
await queue.join()
async def consumer(queue: asyncio.Queue):
while True:
try:
message = await queue.get()
await eat(queue)
except:
queue.shutdown(immediate=True)
raise
else:
queue.task_done()
And the control flow goes as follows:
控制流程如下:
producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。producer begins blocking until the consumer calls task_done.producer 开始阻塞,直到消费者调用 task_done 。consumer receives an item and calls eat.consumer 收到一件商品并调用 eat 。eat raises an exception and the consumer task dies. The queue is shut down.eat 抛出异常,消费者任务终止。队列被关闭。queue.join wakes up because I passed immediate=True. If I didn't pass that, it would block forever instead.queue.join 被唤醒是因为我传递了 immediate=True 。如果我没有传递这个参数,它就会一直阻塞下去。producer produces an item and tries sending it to the queue, but put fails because the queue is shut down.producer 生产一个项目并尝试将其发送到队列,但由于队列已关闭,因此 put 失败。This eliminates the race condition entirely. This isn't a very useful pattern because with one consumer and one producer it can be generalised into just calling the consumer function from the producer. It would be more useful if I add a second consumer, assuming consumers are slower than the producer:
这完全消除了竞争条件。这不是一个非常有用的模式,因为只有一个消费者和一个生产者,可以概括为只从生产者调用消费者函数。如果我添加第二个消费者,假设消费者比生产者慢,那么会更有用:
producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。producer begins blocking until the consumer calls task_done.producer 开始阻塞,直到消费者调用 task_done 。eat.eat 。To fix this, consumer task 1 won't shut down the queue but will restart itself, perhaps from an external supervisor.
为了解决这个问题,消费者任务 1 不会关闭队列,而是会自行重新启动,可能是通过外部监控器。
async def consumer(queue: asyncio.Queue):
while True:
try:
message = await queue.get()
await eat(queue)
except Exception:
logger.exception()
return
else:
queue.task_done()
Let's look at the control flow for a final time:
让我们最后看一下控制流:
producer produces an item and sends it to the queue.producer 生产一个项目并将其发送到队列。producer begins blocking until the consumer calls task_done.producer 开始阻塞,直到消费者调用 task_done 。eat.eat 。get.get 上。producer continues blocking on join.producer 在 join 时继续阻塞。getgetThis could be fixed by making the first consumer task try and re-insert an item on an exception, but what happens if the second task has had an error? Deadlocks. At this point, I give up and pull in an AMQP server instead of dealing with in-library queues.
这个问题可以通过让第一个消费者任务在发生异常时尝试重新插入数据来解决,但如果第二个任务也出错了怎么办?死锁。这时,我放弃了,转而使用 AMQP 服务器,而不是处理库内队列。
What I'm really looking for is a combination of the following:
我真正寻找的是以下内容的组合:
.join())..join() )。Trio's channels implement these behaviours. Let's re-write the consumer/producer pattern to use channels:
Trio 的通道实现了这些行为。让我们重写消费者/生产者模式来使用通道:
async def producer(channel: trio.MemorySendChannel[Message]):
async with channel:
while True:
message: Message = await do_some_networking_thing()
await channel.send(message)
async def consumer(channel: trio.MemoryReceiveChannel[Message]):
async with channel:
while True:
result = await channel.receive()
try:
await do_something(result)
except Exception:
logger.exception()
return
async def main():
send, receive = trio.open_memory_channel[Message](max_buffer_size=0)
async with trio.open_nursery() as n:
for _ in range(5):
consumer_channel = receive.clone()
n.start_soon(partial(consumer, consumer_channel))
n.start_soon(partial(producer, send))
Trio channels with a buffer size of zero act as transfer queues, a name coined by Java 7 (released in 2011 (!!)), where the sender always waits for a receiver to take a message from the channel. Each receiver gets its own unique clone of the channel that can be independently cloned and messages are sent from the sender channel in a round-robin fashion. These clones can be independently closed without affecting the other cloned channels; only once the final receive channel is closed will the sending channel begin raising errors. TransferQueue was created four solid years before asyncio existed. I really see no excuse for this behaviour to have existed when asyncio was being developed.
缓冲区大小为零的三通道充当传输队列 ,这是 Java 7 (2011 年发布(!!))创造的名称,其中发送方始终等待接收方从通道中获取消息。每个接收方都会获得自己唯一的通道克隆 ,这些克隆可以独立克隆,并且消息以循环方式从发送方通道发送。这些克隆可以独立关闭,而不会影响其他克隆的通道;只有在最后一个接收通道关闭后,发送通道才会开始抛出错误。TransferQueue 是在 asyncio 出现整整四年前创建的。我实在看不出在开发 asyncio 时存在这种行为有什么理由 TransferQueue
The only problem this doesn't solve is that if the consumer has an error after receiving an object, that object stays unprocessed. This is a problem with both implementations and channels don't (yet) fix this; but there's nothing in the conceptual model that would prevent some form of RetryingChannel class that blocks the producer until an item is eventually processed. The same can't really be said of Queues, which will always buffer at least one item no matter what you do.
唯一无法解决的问题是,如果消费者在接收对象后发生错误,该对象将保持未处理状态。这两种实现方式都存在这个问题,而且通道(目前)尚未修复;但概念模型中没有任何机制可以阻止某种形式的 RetryingChannel 类,该类会阻塞生产者,直到最终处理完一个项目。队列则不然,无论你做什么,它总是会缓冲至少一个项目。
A more detailed look at all the issues with backpressure can be read in this post by the Trio creator.
您可以在 Trio 创建者的这篇文章中阅读有关背压的所有问题的更详细的介绍。
Whilst those four areas are some of the worst parts of asyncio, there's a lot of minor warts that make it unpleasant to use everywhere else.
虽然这四个方面是 asyncio 最糟糕的部分,但是还有很多小缺陷使其在其他地方使用起来不愉快。
It is an inevitability that asynchronous code needs to use threads for computationally intensive code or for libraries that still use blocking I/O. asyncio offers two APIs for this:
对于计算密集型代码或仍然使用阻塞 I/O 的库,异步代码不可避免地需要使用线程 asyncio 为此提供了两个 API:
asyncio.to_thread which propagates context variables correctly to worker threads but doesn't let you specify the concurrent.futures.ThreadPoolExecutor to use.asyncio.to_thread 将上下文变量正确传播到工作线程,但不允许您指定要使用的 concurrent.futures.ThreadPoolExecutor 。loop.run_in_executor which doesn't propagate context variables but does let you specify the ThreadPoolExecutor to use; you need to wrap every function you're passing in a Context.run call.loop.run_in_executor 不会传播上下文变量,但允许您指定要使用的 ThreadPoolExecutor ;您需要将传递的每个函数包装在 Context.run 调用中。This trade-off is very niche but it also doesn't really need to exist. The more important problem with threads comes from calling back into the event loop from a thread; cancellation does not propagate properly! Take this example:
这种权衡非常小众,但实际上也没有必要存在。线程更严重的问题来自于从线程回调到事件循环;取消操作无法正确传播!举个例子:
import asyncio
from functools import partial
async def coro():
await asyncio.sleep(5)
print("as if i would be cancelled!")
def in_thread(loop: asyncio.AbstractEventLoop):
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
fut.result()
async def main():
t = asyncio.create_task(asyncio.to_thread(partial(in_thread, asyncio.get_running_loop())))
await asyncio.sleep(0)
t.cancel()
await asyncio.sleep(7)
asyncio.run(main())
Running this will print as if i would be cancelled! because cancelling the to_thread task will not cancel the synchronous task running on the event loop. Let's look at how Trio does it:
运行此代码会打印出 as if i would be cancelled! 因为取消 to_thread 任务不会取消正在事件循环上运行的同步任务。让我们看看 Trio 是如何做到的:
from functools import partial
import trio
import trio.from_thread
import trio.to_thread
async def async_task():
await trio.sleep(5)
print("looks like I survived being cancelled")
return 1
def sync_task():
try:
ret = trio.from_thread.run(async_task)
except BaseException as e:
print("raised", e)
else:
print("returned", ret)
async def main():
async with trio.open_nursery() as group:
group.start_soon(partial(trio.to_thread.run_sync, sync_task))
await trio.sleep(1)
group.cancel_scope.cancel()
trio.run(main)
Cancelling the outer cancel scope will cancel the inner task and this code will print raised Cancelled as the exception (correctly) propagates outwards into the sync_task function.
取消外部取消范围将取消内部任务,并且此代码将打印 raised Cancelled 因为异常(正确地)向外传播到 sync_task 函数中。
asyncio's Unix signal API consists entirely of loop.add_signal_handler, which takes a callback and schedules it on the event loop when a single signal is received; and loop.remove_signal_handler which rips out a handler for the specific signal manually. Compare this to Trio's open_signal_receiver API which lets you listen to multiple signals with one object, uses an asynchronous context manager to ensure that the handler is cleaned up, and is an iterator instead of a callback so the control flow is linear and easier to follow.asyncio 的 Unix 信号 API 完全由 loop.add_signal_handler 和 loop.remove_signal_handler 组成,前者接受一个回调函数,并在收到单个信号时将其调度到事件循环中;后者会手动移除特定信号的处理程序。相比之下,Trio 的 open_signal_receiver API 允许你使用一个对象监听多个信号,使用异步上下文管理器来确保处理程序被清理,并且它是一个迭代器而不是回调函数,因此控制流是线性的,更容易理解。
Eager tasks were a performance optimisation that was added where create_task forces a task to run up to the first suspension point, as opposed to lazy tasks where they will not run until the next tick of the event loop.
急切任务是一种性能优化,它被添加到 create_task 中,强制任务运行到第一个暂停点,而惰性任务则要等到事件循环的下一个滴答声才会运行。
Unfortunately, they were broken on release (1, 2) when interacting with TaskGroup, and libraries often depend on the explicit semantics of lazy tasks that have existed up to the present day.
不幸的是,它们在发布时与 TaskGroup 交互时被破坏了( 1 , 2 ),并且库通常依赖于迄今为止存在的惰性任务的显式语义。
Speaking of TaskGroups, they are a mechanism to enforce structured concurrency in an asyncio world. But due to asyncio's lack of block-based cancellation - it only supports cancellation of single tasks - there's no way to cancel entire task groups. You have to cancel the task running the TaskGroup instead, which doesn't work if you only want to cancel a nested TaskGroup and not the root one.
说到 TaskGroup ,它们是在 asyncio 世界中强制结构化并发的一种机制。但是由于 asyncio 缺乏基于块的取消功能(它仅支持取消单个任务),因此无法取消整个任务组。您必须取消运行 TaskGroup 任务,如果您只想取消嵌套的 TaskGroup 而不是根 TaskGroup,则此方法无效。
Trio does not have this issue because it has scope cancellation instead. Code running inside a CancelScope context manager can be cancelled independently, regardless of how nested it is inside a task, instead of needing the entire task to be cancelled at the very top level.
Trio 没有这个问题,因为它有作用域取消功能。在 CancelScope 上下文管理器中运行的代码可以独立取消,无论它在任务中嵌套得如何,而不需要从最顶层取消整个任务。
asyncio is not a good library. It is constantly full of sharp edges everywhere with implementation details leaking and poorly designed APIs forcing end users into odd code patterns to avoid fundamental flaws in the interfaces.asyncio 不是一个好的库。它总是到处都充斥着尖锐的问题,实现细节泄露,API 设计糟糕,迫使最终用户为了避免接口中的根本缺陷而使用奇怪的代码模式。
Trio fixes nearly every single issue in this post. AnyIO implements Trio-like semantics on top of asyncio, whilst still letting you use most parts of libraries designed for asyncio.
Trio 几乎修复了本文中的所有问题。AnyIO 在 asyncio 之上实现了类似 Trio 的语义,同时仍然允许您使用为 asyncio 设计的库的大部分功能。