这个问题是由我的另一个问题:如何在cdef等待?

网络上有大量关于asyncio的文章和博客文章,但它们都非常肤浅。我找不到任何关于asyncio实际是如何实现的,以及什么使I/O异步的信息。我试图阅读源代码,但它有数千行不是最高级的C代码,其中很多处理辅助对象,但最重要的是,它很难将Python语法和它将转换成的C代码联系起来。

Asycnio自己的文档就更没有帮助了。这里没有关于它如何工作的信息,只有一些关于如何使用它的指南,这些指南有时也会误导/写得很糟糕。

我熟悉Go的协程实现,并希望Python也能做同样的事情。如果是这样的话,我在上面链接的帖子中出现的代码应该是有效的。既然它没有,我现在正试图找出原因。到目前为止,我最好的猜测如下,请纠正我的错误:

Procedure definitions of the form async def foo(): ... are actually interpreted as methods of a class inheriting coroutine. Perhaps, async def is actually split into multiple methods by await statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far. If the above is true, then, essentially, execution of a coroutine boils down to calling methods of coroutine object by some global manager (loop?). The global manager is somehow (how?) aware of when I/O operations are performed by Python (only?) code and is able to choose one of the pending coroutine methods to execute after the current executing method relinquished control (hit on the await statement).

换句话说,这是我试图将一些asyncio语法“糖化”成更容易理解的东西:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?


这一切都归结为asyncio正在解决的两个主要挑战:

如何在一个线程中执行多个I/O ? 如何实现协同多任务处理?

第一点的答案已经存在很长一段时间了,被称为选择循环。在python中,它在selectors模块中实现。

第二个问题与协程的概念有关,即可以停止执行并稍后恢复的函数。在python中,协程是使用生成器和yield from语句实现的。这就是隐藏在async/await语法背后的东西。

在这个答案中有更多的资源。


编辑:解决您对goroutines的评论:

在asyncio中最接近于goroutine的实际上不是协程,而是任务(请参阅文档中的区别)。在python中,协程(或生成器)不知道事件循环或I/O的概念。它只是一个函数,可以使用yield停止执行,同时保持当前状态,以便稍后恢复。语法的yield允许以透明的方式将它们链接起来。

现在,在一个asyncio任务中,链的最底部的协程总是最终产生一个future。然后,这个future出现在事件循环中,并集成到内部机制中。当future被其他内部回调设置为done时,事件循环可以通过将future发送回协程链来恢复任务。


编辑:解决你帖子中的一些问题:

在这种情况下,I/O实际上是如何发生的?在一个单独的线程?整个解释器挂起,I/O发生在解释器之外吗?

不,线程中没有发生任何事情。I/O总是由事件循环管理,主要是通过文件描述符。然而,这些文件描述符的注册通常被高级协程隐藏,这就为您带来了麻烦。

I/O到底是什么意思?如果我的python过程调用C open()过程,它反过来将中断发送给内核,放弃对它的控制,python解释器如何知道这一点,并能够继续运行一些其他代码,而内核代码进行实际的I/O,直到它唤醒最初发送中断的python过程?原则上,Python解释器如何意识到这种情况?

I/O是任何阻塞调用。在asyncio中,所有的I/O操作都应该经过事件循环,因为如您所说,事件循环无法意识到某些同步代码中正在执行阻塞调用。这意味着您不应该在协程的上下文中使用同步打开。相反,使用专用的库,如aiofiles,它提供了open的异步版本。

你的coro糖化是正确的概念,但略不完整。

Await不会无条件挂起,只在遇到阻塞调用时挂起。它是如何知道呼叫被阻塞的?这是由正在等待的代码决定的。例如,socket read的可等待实现可以被糖化为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在实际的asyncio中,等效代码修改Future的状态,而不是返回神奇的值,但概念是相同的。当适当地适应类似生成器的对象时,可以等待上面的代码。

在调用方,当你的协程包含:

data = await read(sock, 1024)

它糖化成类似的东西:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉发电机的人倾向于描述上述方面的产量,从其中自动暂停。

挂起链一直延续到事件循环,该事件循环注意到协程被挂起,将其从可运行集中移除,并继续执行可运行的协程(如果有的话)。如果没有可运行的协程,则循环在select()中等待,直到协程感兴趣的文件描述符准备好进行IO或超时。(事件循环维护一个文件描述符到协程的映射。)

在上面的例子中,一旦select()告诉事件循环sock是可读的,它将重新将coro添加到可运行集,因此它将从挂起点继续执行。

换句话说:

默认情况下,所有事情都发生在同一个线程中。 事件循环负责调度协程,并在协程正在等待的任何事情(通常是一个通常会阻塞的IO调用或超时)准备就绪时唤醒协程。

为了深入了解协同程序驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环。

asyncio是如何工作的?

在回答这个问题之前,我们需要了解一些基本术语,如果你已经知道其中任何一个,就跳过这些。

发电机

生成器是允许我们暂停python函数执行的对象。用户管理生成器使用关键字yield实现。通过创建一个包含yield关键字的普通函数,我们将该函数转换为生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如您所见,在生成器上调用next()会导致解释器加载测试的帧,并返回产生的值。再次调用next()将导致该帧再次加载到解释器堆栈中,并继续产生另一个值。

在第三次调用next()时,生成器已经完成,并抛出StopIteration。

与发电机通信

生成器的一个鲜为人知的特性是,您可以使用两个方法与它们通信:send()和throw()。

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

在调用gen.send()时,该值作为yield关键字的返回值传递。

另一方面,gen.throw()允许在生成器内部抛出异常,在yield被调用的同一点引发异常。

从生成器返回值

从生成器返回一个值,导致该值被放入StopIteration异常中。我们可以稍后从异常中恢复值,并将其用于我们的需要。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

看,一个新的关键词:屈服

Python 3.4添加了一个新的关键字:yield from。该关键字允许我们做的是将任何next(), send()和throw()传递到最内部嵌套的生成器中。如果内部生成器返回一个值,它也是yield from的返回值:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我已经写了一篇文章来进一步阐述这个主题。

把它们放在一起

在Python 3.4中引入新的关键字yield后,我们现在能够在生成器中创建生成器,就像隧道一样,将数据从最内部的生成器来回传递到最外部的生成器。这为生成器带来了一个新的含义——协程。

协程是可以在运行时停止和恢复的函数。在Python中,它们是使用async def关键字定义的。就像发电机一样,它们也使用自己的等待产量形式。在Python 3.5引入async和await之前,我们以与生成器创建完全相同的方式创建协程(使用yield from而不是await)。

async def inner():
    return 1

async def outer():
    await inner()

就像所有迭代器和生成器实现__iter__()方法一样,所有协程都实现__await__(),这允许它们在每次await coro被调用时继续执行。

在Python文档中有一个很好的序列图,你应该去看看。

在asyncio中,除了协程函数,我们还有两个重要的对象:任务和未来。

期货

期货是实现了__await__()方法的对象,它们的工作是保存特定的状态和结果。状态可以是以下状态之一:

PENDING - future没有任何结果或异常集。 CANCELLED -使用fut.cancel()取消未来 FINISHED - future被结束,通过使用fut.set_result()的结果集或使用fut.set_exception()的异常集完成。

正如您所猜测的那样,结果可以是一个将返回的Python对象,也可以是一个可能引发的异常。

未来对象的另一个重要特性是它们包含一个名为add_done_callback()的方法。此方法允许在任务完成时立即调用函数——无论它引发异常还是完成。

任务

任务对象是特殊的期货,它围绕着协程,并与最内部和最外部的协程通信。每次协程等待future时,future就会一直传递给任务(就像yield from一样),然后任务接收它。

接下来,任务将自己绑定到未来。它通过在将来调用add_done_callback()来做到这一点。从现在开始,如果将来要完成,无论是取消,传递异常,还是传递一个Python对象,任务的回调将被调用,它将上升到存在。

Asyncio

我们必须回答的最后一个紧迫问题是——IO是如何实现的?

在asyncio的深处,我们有一个事件循环。任务的事件循环。事件循环的工作是在每次任务准备就绪时调用它们,并将所有工作协调到一个工作机器中。

事件循环的IO部分构建在一个称为select的关键函数之上。Select是一个阻塞函数,由下面的操作系统实现,允许在套接字上等待传入或传出数据。在接收到数据时,它将被唤醒,并返回接收到数据的套接字或准备写入的套接字。

当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果它的.send()缓冲区已满,或者.recv()缓冲区为空,套接字就会注册到select函数(只需将其添加到其中一个列表中,rlist用于recv, wlist用于send),相应的函数将等待一个新创建的future对象,该对象与该套接字绑定。

当所有可用任务都在等待未来时,事件循环调用select并等待。当其中一个套接字有传入数据,或者它的发送缓冲区耗尽时,asyncio检查绑定到该套接字的未来对象,并将其设置为done。

现在奇迹发生了。未来被设置为完成,之前使用add_done_callback()添加自己的任务将复活,并在协程上调用.send(),该协程将恢复最内部的协程(因为等待链),您将从它溢出到的附近缓冲区读取新接收的数据。

在recv()的情况下,再次使用方法链:

选择。选择等待。 返回一个就绪的套接字,其中包含数据。 来自套接字的数据被移动到缓冲区中。 调用Future.set_result()。 使用add_done_callback()添加自己的任务现在被唤醒。 Task在协程上调用.send(),它会一直进入最内部的协程并唤醒它。 数据从缓冲区读取并返回给我们的普通用户。

总之,asyncio使用生成器功能,允许暂停和恢复函数。它使用yield from功能,允许将数据从最内部的生成器来回传递到最外部的生成器。它使用所有这些方法是为了在等待IO完成时暂停函数执行(通过使用OS选择函数)。

最好的是什么?当一个函数暂停时,另一个函数可能会运行并与精致的结构交织,这是asyncio的。

谈论async/await和asyncio不是一回事。前者是一个基本的、低级的构造(协程),而后者是一个使用这些构造的库。相反,没有唯一的最终答案。

下面是async/await和asyncio-like库如何工作的一般描述。也就是说,上面可能还有其他技巧(有……),但它们都是无关紧要的,除非你自己创建它们。这种差异应该可以忽略不计,除非你已经知道得足够多,不必问这样的问题。

1. 简而言之,协程与子例程

就像子例程(函数,过程,…)一样,协程(生成器,…)是调用堆栈和指令指针的抽象:有一个执行代码段的堆栈,每个代码段都位于特定的指令上。

def和async def的区别仅仅是为了清晰。实际的差别是回报和收益率。因此,从单个调用到整个堆栈的差异中await或yield。

1.1. 子例程

子例程表示一个新的堆栈级别,以保存局部变量,并对其指令进行一次遍历以达到结束。考虑这样的子程序:

def subfoo(bar):
     qux = 3
     return qux * bar

当你运行它时,这意味着

为bar和qux分配堆栈空间 递归地执行第一个语句并跳转到下一个语句 每次返回时,将其值推入调用堆栈 清除堆栈(1.)和指令指针(2.)

值得注意的是,4。意味着子程序总是以相同的状态开始。函数本身专有的所有内容在完成时都将丢失。一个函数不能被恢复,即使在返回后有指令。

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. 协程作为持久子例程

协程类似于子例程,但可以在不破坏其状态的情况下退出。考虑这样一个协程:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

当你运行它时,这意味着

为bar和qux分配堆栈空间 递归地执行第一个语句并跳转到下一个语句 一旦达到yield,将其值推入调用堆栈,但存储堆栈和指令指针 一旦调用yield,恢复堆栈和指令指针,并将参数推入qux 每次返回时,将其值推入调用堆栈 清除堆栈(1.)和指令指针(2.)

注意2.1和2.2的添加—协程可以在预定义的点挂起和恢复。这与子例程在调用另一个子例程时挂起类似。不同之处在于活动协程没有严格绑定到它的调用堆栈。相反,挂起的协程是一个单独的、孤立的堆栈的一部分。

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

这意味着挂起的协程可以在堆栈之间自由存储或移动。任何访问协程的调用栈都可以决定恢复它。

1.3. 遍历调用堆栈

到目前为止,我们的协程只在yield下到调用堆栈。子例程可以使用return和()在调用堆栈中上下移动。为了完整起见,协程还需要一种机制来上升到调用堆栈。考虑这样一个协程:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

当您运行它时,这意味着它仍然像子例程一样分配堆栈和指令指针。当它挂起时,这仍然像存储一个子程序。

然而,yield from两者都有。它挂起堆栈和wrap的指令指针,并运行cofoo。注意,在cofoo完全完成之前,wrap一直处于暂停状态。每当cofoo挂起或发送一些东西时,cofoo直接连接到调用堆栈。

1.4. 协程一直到下面

在建立时,yield from允许跨另一个中间作用域连接两个作用域。当递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部。

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

注意,root和coro_b彼此不知道。这使得协程比回调简洁得多:协程仍然像子例程一样建立在1:1的关系上。协程挂起并恢复其整个现有的执行堆栈,直到常规调用点。

值得注意的是,root可以恢复任意数量的协程。然而,它永远不能同时恢复多个。相同根的协程是并发的,但不是并行的!

1.5. Python的async和await

到目前为止,解释已经明确地使用了生成器的yield和yield from词汇表-底层功能是相同的。新的Python3.5语法async和await的存在主要是为了清晰。

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

async for和async with语句是必需的,因为你会用裸for和with语句打破yield from/await链。

2. 一个简单事件循环的剖析

就其本身而言,协程没有将控制权交给另一个协程的概念。它只能在协程堆栈的底部将控制权交给调用方。然后,调用者可以切换到另一个协程并运行它。

几个协程的根节点通常是一个事件循环:在挂起时,协程产生一个它想要恢复的事件。反过来,事件循环能够有效地等待这些事件发生。这允许它决定接下来运行哪个协程,或者在恢复之前如何等待。

这样的设计意味着循环可以理解一组预定义的事件。几个协程相互等待,直到最后等待一个事件。此事件可以通过交出控制权直接与事件循环通信。

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

关键是协程挂起允许事件循环和事件直接通信。中间协程栈不需要知道哪个循环在运行它,也不需要知道事件是如何工作的。

2.1.1. 时间事件

最简单的事件是到达某个时间点。这也是线程代码的一个基本块:线程重复地休眠,直到条件为真。 然而,常规睡眠本身会阻塞执行——我们希望其他协程不被阻塞。相反,我们希望告诉事件循环什么时候应该恢复当前协程堆栈。

2.1.2. 定义事件

事件只是一个我们可以识别的值——通过枚举、类型或其他标识。我们可以用一个存储目标时间的简单类来定义它。除了存储事件信息外,我们还可以允许直接等待类。

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self
    
    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

这个类只存储事件——它没有说明如何实际处理它。

唯一的特殊功能是__await__ -它是await关键字所寻找的。实际上,它是一个迭代器,但不适用于常规迭代机制。

2.2.1. 等待事件

现在我们有了一个事件,协程如何对它做出反应?我们应该能够通过等待事件来表达睡眠的等效内容。为了更好地了解发生了什么,有一半的时间我们会等待两次:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

我们可以直接实例化并运行这个协程。类似于生成器,使用协程。Send运行协程,直到产生结果。

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

这为我们提供了两个AsyncSleep事件,然后在协程完成时提供了一个StopIteration。请注意,唯一的延迟来自时间。在循环中睡觉!每个AsyncSleep只存储当前时间的偏移量。

2.2.2. 事件+睡眠

在这一点上,我们有两个独立的机制可供我们使用:

可从协程内部产生的AsyncSleep事件 时间。可以等待而不影响协程的睡眠

值得注意的是,这两者是正交的:一个不会影响或触发另一个。因此,我们可以提出自己的睡眠策略来满足AsyncSleep的延迟。

2.3. 一个简单的事件循环

如果我们有几个协程,每个协程都可以告诉我们它什么时候想要被唤醒。然后,我们可以等待其中的第一个,然后等待下一个,依此类推。值得注意的是,在每个点上,我们只关心下一个是哪个。

这就形成了一个简单的调度:

按所需的唤醒时间对协程进行排序 选第一个想醒来的 等到这个时间点 运行这个协程 从1开始重复。

简单的实现不需要任何高级概念。列表允许按日期对协程进行排序。等待是固定的时间。运行协程就像之前使用corroutine .send一样。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

当然,这方面还有很大的改进空间。我们可以为等待队列使用堆,或为事件使用分派表。我们还可以从StopIteration中获取返回值并将它们分配给协程。然而,基本原则是不变的。

2.4. 合作等

AsyncSleep事件和run事件循环是计时事件的完全工作实现。

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

这会在五个协程之间进行合作切换,每个协程暂停0.1秒。即使事件循环是同步的,它仍然在0.5秒而不是2.5秒内执行工作。每个协程保存状态并独立行动。

3.I/O事件循环

支持睡眠的事件循环适用于轮询。然而,在文件句柄上等待I/O可以更有效地完成:操作系统实现I/O,因此知道哪些句柄已经准备好了。理想情况下,事件循环应该支持显式的“ready for I/O”事件。

3.1. select调用

Python已经有一个接口来查询操作系统的读I/O句柄。当使用句柄来读取或写入时,它返回准备读取或写入的句柄:

readable, writable, _ = select.select(rlist, wlist, xlist, timeout)

例如,我们可以打开一个文件并等待它准备好:

write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])

一旦select返回,writable包含我们打开的文件。

3.2. 基本I/O事件

类似于AsyncSleep请求,我们需要为I/O定义一个事件。使用底层的选择逻辑,事件必须引用一个可读的对象——比如一个打开的文件。此外,我们还存储要读取的数据量。

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = b'' if 'b' in file.mode else ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

与AsyncSleep一样,我们主要只存储底层系统调用所需的数据。这一次,__await__可以被多次恢复,直到读取所需的量为止。此外,我们返回I/O结果,而不是仅仅恢复。

3.3. 用读I/O增加事件循环

事件循环的基础仍然是前面定义的运行。首先,我们需要跟踪读请求。这不再是一个排序的调度,我们只将读请求映射到协程。

# new
waiting_read = {}  # type: Dict[file, coroutine]

因为选择。Select有一个timeout参数,我们可以用它来代替time.sleep。

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])

这为我们提供了所有可读文件——如果有,我们运行相应的协程。如果没有,我们已经等待了足够长的时间来运行当前的协程。

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

最后,我们必须实际监听读请求。

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. 把它们放在一起

上面的描述有点简单化。我们需要做一些切换,如果我们总能读取,就不会饿死睡眠协程。我们需要面对无书可读、无事可等的现实。但是,最终结果仍然适合30 LOC。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. 合作的I / O

AsyncSleep, AsyncRead和run实现现在完全可以用于睡眠和/或读取。 和困倦一样,我们可以定义一个帮助来测试阅读:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

运行这个,我们可以看到我们的I/O与等待任务交织在一起:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. 非阻塞I / O

虽然文件上的I/O理解了这个概念,但它并不真正适合asyncio这样的库:select调用总是返回文件,并且open和read都可能无限期地阻塞。这会阻塞事件循环的所有协程——这很糟糕。aiofiles等库使用线程和同步来伪造文件上的非阻塞I/O和事件。

然而,套接字确实允许非阻塞I/O,它们固有的延迟使它变得更加关键。当在事件循环中使用时,等待数据和重试可以被包装而不阻塞任何东西。

4.1. 非阻塞I/O事件

类似于我们的AsyncRead,我们可以为套接字定义一个挂起并读取事件。我们不取文件,而是取套接字——它必须是非阻塞的。而且,我们的__await__使用套接字。Recv而不是file.read。

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

与AsyncRead相比,__await__执行真正的非阻塞I/O。当数据可用时,它总是读取数据。当没有可用数据时,它总是挂起。这意味着事件循环只在我们执行有用的工作时被阻塞。

4.2. 解除事件循环阻塞

就事件循环而言,没有什么变化。要监听的事件仍然与文件事件相同——一个由select标记为ready的文件描述符。

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

此时,应该很明显AsyncRead和AsyncRecv是同一种事件。我们可以很容易地将它们重构为一个具有可交换I/O组件的事件。实际上,事件循环、协程和事件将调度程序、任意中间代码和实际I/O清晰地分开。

4.3. 非阻塞I/O的丑陋一面

原则上,此时您应该做的是将read的逻辑复制为AsyncRecv的recv。然而,现在这要丑陋得多——当函数在内核内阻塞时,你必须处理早期返回,但控制权交给你。例如,打开一个连接比打开一个文件要长得多:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经工作了。

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

齿顶高

github上的示例代码

什么是asyncio?

Asyncio代表异步输入输出,指的是使用单个线程或事件循环实现高并发的编程范式。 异步编程是一种并行编程,允许工作单元与主应用程序线程分开运行。当工作完成时,它通知主线程工作线程的完成或失败。

让我们看看下图:

让我们用一个例子来理解asyncio:

为了理解asyncio背后的概念,让我们考虑一家只有一个服务员的餐厅。突然,三个顾客,A, B和C出现了。他们三个人从服务员那里拿到菜单后,花了不同的时间来决定吃什么。

假设A需要5分钟,B需要10分钟,C需要1分钟。如果单身服务员先从B开始,在10分钟内为B点餐,然后他为A服务,花5分钟记录他点的菜,最后花1分钟知道C想吃什么。 所以,服务员总共要花10 + 5 + 1 = 16分钟来记下他们点的菜。但是,请注意在这个事件序列中,C在服务员到达他之前等了15分钟,A等了10分钟,B等了0分钟。

现在考虑一下,如果服务员知道每位顾客做出决定所需的时间。他可以先从C开始,然后到A,最后到b。这样每个顾客的等待时间为0分钟。 尽管只有一个服务员,但却产生了三个服务员的错觉,每个顾客都有一个服务员。

最后,服务员完成三份订单所需的总时间为10分钟,远少于另一种情况下的16分钟。

让我们来看另一个例子:

假设,国际象棋大师马格努斯·卡尔森(Magnus Carlsen)主持了一场国际象棋展览,他与多名业余棋手同场竞技。他有两种方式进行展览:同步和异步。

假设:

24的对手 马格努斯·卡尔森在5秒内走完每一步棋 每个对手有55秒的时间来移动 游戏平均30对棋(总共60步)

同步:马格努斯·卡尔森一次只玩一局,从不同时玩两局,直到游戏完成。每款游戏耗时(55 + 5)* 30 == 1800秒,即30分钟。整个展览耗时24 * 30 == 720分钟,即12个小时。

异步:马格努斯·卡尔森从一张桌子移动到另一张桌子,在每张桌子上移动一次。她离开牌桌,让对手在等待时间内采取下一步行动。Judit在所有24局游戏中的一次移动需要24 * 5 == 120秒,即2分钟。整个展览缩短到120 * 30 == 3600秒,也就是1个小时

世界上只有一个马格努斯·卡尔森(Magnus Carlsen),他只有两只手,自己一次只能走一步棋。但异步游戏将展示时间从12小时缩短至1小时。

代码示例:

让我们尝试使用代码片段演示同步和异步执行时间。

异步- async_count.py

import asyncio  
import time  
  
  
async def count():  
    print("One", end=" ")  
    await asyncio.sleep(1)  
    print("Two", end=" ")  
    await asyncio.sleep(2)  
    print("Three", end=" ")  
  
  
async def main():  
    await asyncio.gather(count(), count(), count(), count(), count())  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    asyncio.run(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

异步-输出:

One One One One One Two Two Two Two Two Three Three Three Three Three 
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.

Synchronous - sync_count.py

import time  
  
  
def count():  
    print("One", end=" ")  
    time.sleep(1)  
    print("Two", end=" ")  
    time.sleep(2)  
    print("Three", end=" ")  
  
  
def main():  
    for _ in range(5):  
        count()  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    main()  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

同步-输出:

One Two Three One Two Three One Two Three One Two Three One Two Three 
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.

为什么在Python中使用asyncio而不是多线程?

It’s very difficult to write code that is thread safe. With asynchronous code, you know exactly where the code will shift from one task to the next and race conditions are much harder to come by. Threads consume a fair amount of data since each thread needs to have its own stack. With async code, all the code shares the same stack and the stack is kept small due to continuously unwinding the stack between tasks. Threads are OS structures and therefore require more memory for the platform to support. There is no such problem with asynchronous tasks.

asyncio是如何工作的?

在深入讨论之前,让我们回顾一下Python Generator

Python发电机:

包含yield语句的函数被编译为生成器。在函数体中使用yield表达式会导致该函数成为生成器。这些函数返回一个支持迭代协议方法的对象。自动创建的生成器对象接收__next()__方法。回到上一节的例子,我们可以直接在生成器对象上调用__next__,而不是使用next():

def asynchronous():
    yield "Educative"


if __name__ == "__main__":
    gen = asynchronous()

    str = gen.__next__()
    print(str)

请记住以下关于生成器的内容:

Generator functions allow you to procrastinate computing expensive values. You only compute the next value when required. This makes generators memory and compute efficient; they refrain from saving long sequences in memory or doing all expensive computations upfront. Generators, when suspended, retain the code location, which is the last yield statement executed, and their entire local scope. This allows them to resume execution from where they left off. Generator objects are nothing more than iterators. Remember to make a distinction between a generator function and the associated generator object which are often used interchangeably. A generator function when invoked returns a generator object and next() is invoked on the generator object to run the code within the generator function.

发电机状态:

生成器会经历以下几种状态:

当生成器函数第一次返回生成器对象并且迭代还没有开始时,返回GEN_CREATED。 在生成器对象上调用了GEN_RUNNING,并由python解释器执行。 GEN_SUSPENDED:当发电机以一定的产量暂停时 当生成器已完成执行或已关闭时,返回GEN_CLOSED。

生成器对象上的方法:

生成器对象公开了可以调用来操作生成器的不同方法。这些都是:

把() send () close ()

让我们深入了解更多细节

asyncio的规则:

The syntax async def introduces either a native coroutine or an asynchronous generator. The expressions async with and async for are also valid. The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g(), this is how await tells the event loop, "Suspend execution of g() until whatever I’m waiting on—the result of f()—is returned. In the meantime, go let something else run."

在代码中,第二个要点大致如下所示:

async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r

关于何时以及如何使用async/await也有一组严格的规则。无论你是否还在学习语法,或者已经使用过async/await,这些都很方便:

A function that you introduce with async def is a coroutine. It may use await, return, or yield, but all of these are optional. Declaring async def noop(): pass is valid: Using await and/or return creates a coroutine function. To call a coroutine function, you must await it to get its results. It is less common to use yield in an async def block. This creates an asynchronous generator, which you iterate over with async for. Forget about async generators for the time being and focus on getting down the syntax for coroutine functions, which use await and/or return. Anything defined with async def may not use yield from, which will raise a SyntaxError. Just like it’s a SyntaxError to use yield outside of a def function, it is a SyntaxError to use await outside of an async def coroutine. You can only use await in the body of coroutines.

以下是一些简短的例子,旨在总结上述几条规则:

async def f(x):
    y = await z(x)     # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x            # OK - this is an async generator

async def m(x):
    yield from gen(x)  # NO - SyntaxError

def m(x):
    y = await z(x)     # NO - SyntaxError (no `async def` here)
    return y

基于生成器的协程

Python创建了Python生成器和用于协程的生成器之间的区别。这些协程称为基于生成器的协程,并且需要@asynio装饰器。将协程添加到函数定义中,尽管这没有严格执行。

基于生成器的协程使用yield from语法而不是yield。协程可以:

屈服于另一个协程 未来收益 返回一个表达式 提高异常

Python中的协程使得多任务合作成为可能。 协作多任务处理是指正在运行的进程主动将CPU让给其他进程的方法。当一个进程在逻辑上被阻塞时,比如在等待用户输入时,或者当它发起了一个网络请求并将空闲一段时间时,它可能会这样做。 协程可以定义为一个特殊的函数,它可以在不丢失状态的情况下将控制权交给调用者。

那么协程和生成器之间的区别是什么呢?

生成器本质上是迭代器,尽管它们看起来像函数。一般来说,生成器和协程之间的区别是:

生成器将一个值返回给调用者,而协程将控制权交还给另一个协程,并且可以从它放弃控制权开始恢复执行。 一旦启动,生成器就不能接受参数,而协程可以。 生成器主要用于简化迭代器的编写。它们是一种协程,有时也称为半协程。

基于生成器的协程示例

我们可以编写的最简单的基于生成器的协程如下所示:

@asyncio.coroutine
def do_something_important():
    yield from asyncio.sleep(1)

协程休眠一秒。注意装饰器的使用和yield from。

本地基于协程示例

原生的意思是,该语言引入了语法来专门定义协程,使它们成为语言中的一等公民。本地协程可以使用async/await语法定义。 我们可以编写的最简单的基于本机的协程如下所示:

async def do_something_important():
    await asyncio.sleep(1)

AsyncIO设计模式

AsyncIO有自己的一组可能的脚本设计,我们将在本节中讨论。

1. 事件循环

事件循环是一种编程构造,它等待事件发生,然后将它们分派给事件处理程序。事件可以是用户单击UI按钮,也可以是启动文件下载的进程。异步编程的核心是事件循环。

示例代码:

import asyncio  
import random  
import time  
from threading import Thread  
from threading import current_thread  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def do_something_important(sleep_for):  
    print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])  
    await asyncio.sleep(sleep_for)  
  
  
def launch_event_loops():  
    # get a new event loop  
  loop = asyncio.new_event_loop()  
  
    # set the event loop for the current thread  
  asyncio.set_event_loop(loop)  
  
    # run a coroutine on the event loop  
  loop.run_until_complete(do_something_important(random.randint(1, 5)))  
  
    # remember to close the loop  
  loop.close()  
  
  
if __name__ == "__main__":  
    thread_1 = Thread(target=launch_event_loops)  
    thread_2 = Thread(target=launch_event_loops)  
  
    start_time = time.perf_counter()  
    thread_1.start()  
    thread_2.start()  
  
    print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])  
  
    thread_1.join()  
    thread_2.join()  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_event_loop.py

输出:

自己尝试并检查输出,您会发现每个衍生线程都在运行自己的事件循环。

事件循环的类型

有两种类型的事件循环:

SelectorEventLoop: SelectorEventLoop基于选择器模块,是所有平台上的默认循环。 ProactorEventLoop: ProactorEventLoop基于Windows的I/O完成端口,仅在Windows上支持。

2. 期货

Future表示正在进行或将在未来被调度的计算。它是一个特殊的低级可等待对象,表示异步操作的最终结果。不要混淆线程。Future和asyncio.Future。

示例代码:

import time  
import asyncio  
from asyncio import Future  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def bar(future):  
    print(colors[1] + "bar will sleep for 3 seconds" + colors[0])  
    await asyncio.sleep(3)  
    print(colors[1] + "bar resolving the future" + colors[0])  
    future.done()  
    future.set_result("future is resolved")  
  
  
async def foo(future):  
    print(colors[2] + "foo will await the future" + colors[0])  
    await future  
  print(colors[2] + "foo finds the future resolved" + colors[0])  
  
  
async def main():  
    future = Future()  
    await asyncio.gather(foo(future), bar(future))  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    asyncio.run(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_futures.py

输出:

两个协程都传递了一个future。foo()协程等待future被解析,而bar()协程则在三秒后解析future。

3.任务

任务就像未来,事实上,任务是未来的一个子类,可以使用以下方法创建:

Asyncio.create_task()接受协程并将它们包装为任务。 Loop.create_task()只接受协程。 Asyncio.ensure_future()接受未来、协程和任何可等待对象。

任务包装协程并在事件循环中运行它们。如果一个协程在等待一个Future, Task将挂起该协程的执行并等待Future完成。当Future完成时,将继续执行封装的协程。

示例代码:

import time  
import asyncio  
from asyncio import Future  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def bar(future):  
    print(colors[1] + "bar will sleep for 3 seconds" + colors[0])  
    await asyncio.sleep(3)  
    print(colors[1] + "bar resolving the future" + colors[0])  
    future.done()  
    future.set_result("future is resolved")  
  
  
async def foo(future):  
    print(colors[2] + "foo will await the future" + colors[0])  
    await future  
  print(colors[2] + "foo finds the future resolved" + colors[0])  
  
  
async def main():  
    future = Future()  
  
    loop = asyncio.get_event_loop()  
    t1 = loop.create_task(bar(future))  
    t2 = loop.create_task(foo(future))  
  
    await t2, t1  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    loop = asyncio.get_event_loop()  
    loop.run_until_complete(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_tasks.py

输出:

4. 链协同程序:

协程的一个关键特性是它们可以被链接在一起。协程对象是可等待的,因此另一个协程可以等待它。这允许你把程序分解成更小的、可管理的、可回收的协程:

示例代码:

import sys  
import asyncio  
import random  
import time  
  
# ANSI colors  
colors = (  
    "\033[0m",  # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[34m",  # Blue  
)  
  
  
async def function1(n: int) -> str:  
    i = random.randint(0, 10)  
    print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
    result = f"result{n}-1"  
  print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])  
    return result  
  
  
async def function2(n: int, arg: str) -> str:  
    i = random.randint(0, 10)  
    print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
    result = f"result{n}-2 derived from {arg}"  
  print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])  
    return result  
  
  
async def chain(n: int) -> None:  
    start = time.perf_counter()  
    p1 = await function1(n)  
    p2 = await function2(n, p1)  
    end = time.perf_counter() - start  
    print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])  
  
  
async def main(*args):  
    await asyncio.gather(*(chain(n) for n in args))  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])  
    start_time = time.perf_counter()  
    asyncio.run(main(*args))  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

请仔细注意输出,其中function1()休眠了可变的时间,function2()在结果可用时开始工作:

执行命令:python async_chained.py 11 8

输出:

5. 使用队列:

在这种设计中,没有任何个体消费者与生产者之间的链接。消费者不知道生产者的数量,甚至不知道将被添加到队列中的项目的累积数量。

单个生产者或消费者分别花费不同的时间从队列中放置和提取项目。队列作为一个吞吐量,可以与生产者和消费者进行通信,而不需要它们彼此直接通信。

示例代码:

import asyncio  
import argparse  
import itertools as it  
import os  
import random  
import time  
  
# ANSI colors  
colors = (  
    "\033[0m",  # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[34m",  # Blue  
)  
  
  
async def generate_item(size: int = 5) -> str:  
    return os.urandom(size).hex()  
  
  
async def random_sleep(caller=None) -> None:  
    i = random.randint(0, 10)  
    if caller:  
        print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
  
  
async def produce(name: int, producer_queue: asyncio.Queue) -> None:  
    n = random.randint(0, 10)  
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer  
  await random_sleep(caller=f"Producer {name}")  
        i = await generate_item()  
        t = time.perf_counter()  
        await producer_queue.put((i, t))  
        print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])  
  
  
async def consume(name: int, consumer_queue: asyncio.Queue) -> None:  
    while True:  
        await random_sleep(caller=f"Consumer {name}")  
        i, t = await consumer_queue.get()  
        now = time.perf_counter()  
        print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])  
        consumer_queue.task_done()  
  
  
async def main(no_producer: int, no_consumer: int):  
    q = asyncio.Queue()  
    producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]  
    consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]  
    await asyncio.gather(*producers)  
    await q.join()  # Implicitly awaits consumers, too  
  for consumer in consumers:  
        consumer.cancel()  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    parser = argparse.ArgumentParser()  
    parser.add_argument("-p", "--no_producer", type=int, default=10)  
    parser.add_argument("-c", "--no_consumer", type=int, default=15)  
    ns = parser.parse_args()  
    start_time = time.perf_counter()  
    asyncio.run(main(**ns.__dict__))  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_queue.py -p 2 -c 4 .执行以下命令

输出:

最后,让我们看一个asyncio如何减少等待时间的例子:给定一个协程generate_random_int(),它不断生成范围为[0,10]的随机整数,直到其中一个超出阈值,您希望让这个协程的多个调用不需要彼此连续等待完成。

示例代码:

import time  
import asyncio  
import random  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[35m",  # Magenta  
  "\033[34m",  # Blue  
)  
  
  
async def generate_random_int(indx: int, threshold: int = 5) -> int:  
    print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")  
    i = random.randint(0, 10)  
    while i <= threshold:  
        print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")  
        await asyncio.sleep(indx + 1)  
        i = random.randint(0, 10)  
    print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])  
    return i  
  
  
async def main():  
    res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))  
    return res  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    start_time = time.perf_counter()  
    r1, r2, r3 = asyncio.run(main())  
    print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0])  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_random.py

输出:

注意:如果您自己编写任何代码,最好使用本机协同程序 为了明确而不是含蓄。发电机的基础 协程将在Python 3.10中被移除。

GitHub Repo: https://github.com/tssovi/asynchronous-in-python

它允许您编写单线程异步代码,并在Python中实现并发性。基本上,asyncio为异步编程提供了一个事件循环。例如,如果我们需要在不阻塞主线程的情况下发出请求,我们可以使用asyncio库。

asyncio模块允许实现异步编程 使用以下元素的组合:

Event loop: The asyncio module allows an event loop per process. Coroutines: A coroutine is a generator that follows certain conventions. Its most interesting feature is that it can be suspended during execution to wait for external processing (the some routine in I/O) and return from the point it had stopped when the external processing was done. Futures: Futures represent a process that has still not finished. A future is an object that is supposed to have a result in the future and represents uncompleted tasks. Tasks: This is a subclass of asyncio.Future that encapsulates and manages coroutines. We can use the asyncio.Task object to encapsulate a coroutine.

asyncio中最重要的概念是事件循环。事件循环 允许您使用回调或协程编写异步代码。 理解asyncio的关键是协程和事件的术语 循环。协程是有状态函数,当另一个I/O操作正在执行时,可以停止其执行。事件循环用于协调协同例程的执行。

要运行任何协程函数,我们需要获得一个事件循环。我们可以这样做 与

    loop = asyncio.get_event_loop()

这为我们提供了一个BaseEventLoop对象。它有一个run_until_complete方法,该方法接受一个协程并运行它直到完成。然后,协程返回一个结果。在底层,事件循环执行BaseEventLoop.rununtilcomplete(future)方法。

If you picture an airport control tower, with many planes waiting to land on the same runway. The control tower can be seen as the event loop and runway as the thread. Each plane is a separate function waiting to execute. In reality only one plane can land on the runway at a time. What asyncio basically does it allows many planes to land simultaneously on the same runway by using the event loop to suspend functions and allow other functions to run when you use the await syntax it basically means that plane(function can be suspended and allow other functions to process