You are here:  Home » Python » 协同程序和任务 – 异步I / O(Python教程)(参考资料)

协同程序和任务

本节概述了与协同程序和任务一起使用的高级asyncio API .

  • Coroutines
  • Awaitables
  • 运行asyncio程序
  • 创建任务
  • 睡觉
  • 同时运行任务
  • 屏蔽取消
  • 超时
  • 等待原语
  • 从其他线程调度
  • 内省
  • 任务对象
  • 基于生成器的协同程序

协同程序

使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,下面的代码片段(需要Python 3.7+)打印“hello”,等待1秒,然后打印“world”:

>>> import asyncio>>> async def main():...     print("hello")...     await asyncio.sleep(1)...     print("world")>>> asyncio.run(main())helloworld

请注意,只是调用一个协程将不会安排它执行:

>>> main()<coroutine object main at 0x1053bb7c8>

实际运行一个协同程序asyncio提供了三种主要机制:

  • asyncio.run()函数运行top-levelentry点“main()”函数(参见上面的例子。)

  • 等待协程。以下代码片段在等待1秒后打印“你好”,然后在等待another2秒:

    import asyncioimport timeasync def say_after(delay, what):    await asyncio.sleep(delay)    print(what)async def main():    print(f"started at {time.strftime("%X")}")    await say_after(1, "hello")    await say_after(2, "world")    print(f"finished at {time.strftime("%X")}")asyncio.run(main())

    预期输出:

    started at 17:13:52helloworldfinished at 17:13:55
  • asyncio.create_task()与asyncio一起运行coroutines的函数Tasks.

    让我们修改上面的例子并运行两个say_after coroutines concurrently

    async def main():    task1 = asyncio.create_task(        say_after(1, "hello"))    task2 = asyncio.create_task(        say_after(2, "world"))    print(f"started at {time.strftime("%X")}")    # Wait until both tasks are completed (should take    # around 2 seconds.)    await task1    await task2    print(f"finished at {time.strftime("%X")}")

    请注意,预期输出现在显示该片段比以前快1秒:

    started at 17:14:32helloworldfinished at 17:14:34

Awaitables

我们说对象是等待对象如果它可以用在await表达。许多asyncio API被设计为接受awaitables.

有三种主要类型awaitable对象: coroutines , 任务,和期货.

协程

Python协程是awaitables,因此可以从其他协程中等待:

import asyncioasync def nested():    return 42async def main():    # Nothing happens if we just call "nested()".    # A coroutine object is created but not awaited,    # so it *won"t run at all*.    nested()    # Let"s do it differently now and await it:    print(await nested())  # will print "42".asyncio.run(main())

重要

在本文档中,术语“coroutine”可以用于两个密切相关的概念:

  • 一个 coroutine functionasync def功能;
  • 一个 coroutine object:通过调用coroutine function.

asyncio也支持遗产基于生成器 coroutines.

Tasks

Tasks用来安排协程concurrently.

当一个协程包装使用Task这样的函数进入asyncio.create_task()自动调度即可快速运行:

import asyncioasync def nested():    return 42async def main():    # Schedule nested() to run soon concurrently    # with "main()".    task = asyncio.create_task(nested())    # "task" can now be used to cancel "nested()", or    # can simply be awaited to wait until it is complete:    await taskasyncio.run(main())

期货

A Future是特殊的低级等待对象表示异步操作的最终结果

当一个Future对象是awaited这意味着协程将一直等到Future在其他地方被解析.

需要asyncio中的未来对象以允许基于回调的代码与async / await一起使用.

通常没有必要在应用程序级代码创建Future对象.

可以等待有时会被库和一些asyncioAPI暴露的未来对象:

async def main():    await function_that_returns_a_future_object()    # this is also valid:    await asyncio.gather(        function_that_returns_a_future_object(),        some_python_coroutine()    )

返回Future对象的低级函数的一个很好的例子loop.run_in_executor().

运行asyncio程序

asyncio.runcoro, *, debug=False

这个函数运行传递的协同程序,负责管理asyncio事件循环和finalizing asynchronousgenerators.

当另一个asyncio事件循环在同一个线程中运行时,无法调用此函数.

如果debugTrue,则事件循环将以调试模式运行.

此函数始终创建一个新的事件循环并在结束时将其关闭。它应该被用作异步程序的主要入口点,理想情况下只能被调用一次.

新版本3.7:重要:这个函数已添加到Python 3.7中的asyncio 临时基础.

创建任务

asyncio.create_taskcoro

包裹coro 协程进入Task并安排执行。返回Task对象.

任务在get_running_loop(),RuntimeError返回的循环中执行,如果没有正在运行的循环当前线程则被引发

这个函数已经添加到Python 3.7 。在PHP 3.7之前,低级asyncio.ensure_future()功能可以用来代替:

async def coro():    ...# In Python 3.7+task = asyncio.create_task(coro())...# This works in all Python versions but is less readabletask = asyncio.ensure_future(coro())...

版本3.7.

睡眠

coroutine asyncio.sleepdelay, result=None, *, loop=None

阻挡delayseconds.

如果result提供,当协程完成时它返回给调用者.

sleep()总是暂停当前任务,允许其他任务运行

loop参数已弃用并计划在Python 3.10.中删除

协程示例每秒显示当前日期5秒:

import asyncioimport datetimeasync def display_date():    loop = asyncio.get_running_loop()    end_time = loop.time() + 5.0    while True:        print(datetime.datetime.now())        if (loop.time() + 1.0) >= end_time:            break        await asyncio.sleep(1)asyncio.run(display_date())

同时运行任务

awaitable asyncio.gather*aws, loop=None, return_exceptions=False

序列中运行等待对象aws concurrently.

如果在aws中任何等待的是协程,它会被自动调度为任务.

如果所有等待都成功完成,则结果是返回值的anaggregate列表。结果值的顺序与中的等待顺序相对应aws.

如果return_exceptionsFalse(默认),firstraised异常立即传播到gather()上的任务。aws序列中的其他等待将不会被取消并将继续运行.

如果return_exceptionsTrue,异常被视为成功结果,并在结果列表中汇总.

如果gather()cancelled,所有提交的等待(尚未完成)也是cancelled.

如果aws序列中的任何任务或未来是cancelled,它就好像它被提升了CancelledErrorgather()来电是在这种情况下取​​消。这是为了防止一个提交的任务/未来的取消导致其他任务/期货被取消.

示例:

import asyncioasync def factorial(name, number):    f = 1    for i in range(2, number + 1):        print(f"Task {name}: Compute factorial({i})...")        await asyncio.sleep(1)        f *= i    print(f"Task {name}: factorial({number}) = {f}")async def main():    # Schedule three calls *concurrently*:    await asyncio.gather(        factorial("A", 2),        factorial("B", 3),        factorial("C", 4),    )asyncio.run(main())# Expected output:##     Task A: Compute factorial(2)...#     Task B: Compute factorial(2)...#     Task C: Compute factorial(2)...#     Task A: factorial(2) = 2#     Task B: Compute factorial(3)...#     Task C: Compute factorial(3)...#     Task B: factorial(3) = 6#     Task C: Compute factorial(4)...#     Task C: factorial(4) = 24

更改版本3.7:如果gather本身被取消,取消传播无论return_exceptions.

屏蔽取消

awaitable asyncio.shieldaw, *, loop=None

保护等待对象来自cancelled.

如果aw是一个协程,它被自动安排为一个任务.

声明:

res = await shield(something())

相当于:

res = await something()

except如果包含它的协程被取消,那么运行的任务something()没有取消。从something()的角度来看,取消没有发生。虽然它的来电仍然被取消,所以“等待”的表达仍然提出了CancelledError.

如果something()被其他方式取消(即从内部取消)也会取消shield().

如果希望完全忽略取消(不推荐)shield()功能应与try / exceptclause结合使用,如下所示:

try:    res = await shield(something())except CancelledError:    res = None

超时

coroutine asyncio.wait_foraw, timeout, *, loop=None

等待aw 等待完成超时.

如果aw是一个协程,它自动被安排为一个任务.

timeout可以是None或者一个浮点数或第二个等号的int数。如果timeoutNone,阻止直到将来完成.

如果发生超时,它会取消任务并引发asyncio.TimeoutError.

为了避免任务cancellation,请将其包装在shield().

该函数将等到将来实际取消,所以总等待时间可能超过timeout.

如果等待取消,未来aw也被取消了

loop参数已弃用并计划在Python 3.10.中删除

例:

async def eternity():    # Sleep for one hour    await asyncio.sleep(3600)    print("yay!")async def main():    # Wait for at most 1 second    try:        await asyncio.wait_for(eternity(), timeout=1.0)    except asyncio.TimeoutError:        print("timeout!")asyncio.run(main())# Expected output:##     timeout!

更改版本3.7:什么时候 aw由于超时被取消,wait_for等待aw被取消。以前,它立刻提起asyncio.TimeoutError

Waiting Primitives

coroutine asyncio.waitaws, *, loop=None, timeout=None, return_when=ALL_COMPLETED

等待的物品 在里面 aws同时设置并阻塞,直到return_when.

指定的条件如果在aws中任何等待是一个协程,它会被自动调度为一个任务。将协同程序对象传递给wait()直接被弃用,因为它导致令人困惑的行为.

返回两组任务/期货:(done, pending).

用法:

done, pending = await asyncio.wait(aws)

loop参数已弃用并计划在Python 3.10.中删除

timeout(浮点数或整数),如果指定,可用于控制返回前等待的最大秒数.

注意此函数不会引发asyncio.TimeoutError在超时发生时未完成的未来或任务在第二组中简单回复.

return_when表示此函数何时返回。它必须是以下常量之一:

常量 说明
FIRST_COMPLETED 当任何未完成或取消时,该功能将返回.
FIRST_EXCEPTION 当任何结束时通过引发异常来返回该函数。如果没有未来会引发异常,则相当于ALL_COMPLETED.
ALL_COMPLETED 当所有未完成或被取消时,该功能将返回.

当发生超时时wait_for(), wait()不会取消未来.

注意

wait()协同程序自动调度为任务,然后返回(done, pending) sets中隐式创建的Task对象。因此,以下代码将无法正常工作:

async def foo():    return 42coro = foo()done, pending = await asyncio.wait({coro})if coro in done:    # This branch will never be run!

以下是如何修复上面的代码片段:

async def foo():    return 42task = asyncio.create_task(foo())done, pending = await asyncio.wait({task})if task in done:    # Everything will work as expected now.

直接将协程对象传递给wait()已弃用.

asyncio.as_completed (aws, *, loop=None, timeout=None)

Run等待的物品aws同时设置。返回Future对象的迭代器。每个返回的Future对象代表剩下的awaitable的集合中最早的结果.

Raises asyncio.TimeoutError如果超时发生在所有期货完成之前.

示例:

for f in as_completed(aws):    earliest_result = await f    # ...

从其他线程调度

asyncio.run_coroutine_threadsafecoro, loop

提交协程到给定的事件循环。Thread-safe.

返回concurrent.futures.Future等待来自另一个OS线程的结果.

这个函数是从一个不同的OS线程调用的循环正在运行。示例:

# Create a coroutinecoro = asyncio.sleep(1, result=3)# Submit the coroutine to a given loopfuture = asyncio.run_coroutine_threadsafe(coro, loop)# Wait for the result with an optional timeout argumentassert future.result(timeout) == 3

如果在协程中引发异常,则会通知返回的Future。它也可以用来取消事件循环中的任务:

try:    result = future.result(timeout)except asyncio.TimeoutError:    print("The coroutine took too long, cancelling the task...")    future.cancel()except Exception as exc:    print(f"The coroutine raised an exception: {exc!r}")else:    print(f"The coroutine returned: {result!r}")

参见文档的并发和多线程部分.

与其他asyncio不同函数这个函数需要loop参数明确传递

新版本3.5.1.

Introspection

asyncio.current_task (loop=None

返回当前运行的Task实例,或None如果没有任务正在运行.

如果loopNone get_running_loop()用于获取当前循环.

新版本3.7.

asyncio.all_tasks (loop=None)

返回一套尚未完成的Task由循环运行的对象

如果loopNone, get_running_loop()用于获取当前循环

新版本3.7.

任务对象

class asyncio.Taskcoro, *, loop=None

一个Future-like运行Python的对象 coroutine 。不是thread-safe.

任务用于在事件循环中运行协同程序。如果一个协程在Future上等待,则该任务暂停执行协程并等待Future的完成。当未来是done,包裹的协程的执行恢复.

事件循环使用协作调度:事件循环一次运行任务。当一个Task等待完成aFuture时,事件循环运行其他任务,回调或执行操作.

使用高级asyncio.create_task()函数创建任务,或者使用低级loop.create_task()ensure_future()函数。任务的手动实例化不鼓励

取消正在运行的任务使用cancel()方法。调用它会导致Task在包裹的协同程序中抛出一个CancelledError异常。如果在取消期间正在等待Futureobject的协程,则Future对象将被取消.

cancelled()可以用来检查任务是否被取消。如果包装的协程没有压缩True异常并且实际上被取消,方法返回CancelledError

asyncio.Task继承自Future所有的API除了Future.set_result()Future.set_exception().

任务支持contextvars模块。创建Taskis时,它会复制当前上下文,然后在复制的上下文中运行其coroutine .

更改版本3.7:添加了对contextvars模块的支持

cancel// ()

请求取消任务。

这个安排CancelledError在事件循环的下一个循环中被抛到包装的协程的异常.

协程有机会通过用try…… except CancelledErrorfinally因此,不像Future.cancel(), Task.cancel()并不保证任务将被取消,虽然完全抑制取消并不常见,并且积极劝阻.

以下示例说明协同程序如何拦截取消请求:

async def cancel_me():    print("cancel_me(): before sleep")    try:        # Wait for 1 hour        await asyncio.sleep(3600)    except asyncio.CancelledError:        print("cancel_me(): cancel sleep")        raise    finally:        print("cancel_me(): after sleep")async def main():    # Create a "cancel_me" Task    task = asyncio.create_task(cancel_me())    # Wait for 1 second    await asyncio.sleep(1)    task.cancel()    try:        await task    except asyncio.CancelledError:        print("main(): cancel_me is cancelled now")asyncio.run(main())# Expected output:##     cancel_me(): before sleep#     cancel_me(): cancel sleep#     cancel_me(): after sleep#     main(): cancel_me is cancelled now
cancelled

返回True如果任务是cancelled.

任务是cancelled当要求取消时cancel()包裹的协程传播了CancelledError扔进去的例外.

done

返回True如果任务是done.

任务是done当包装的协程返回值,引发异常,或者任务被取消时.

result ()

返回任务的结果.

如果任务是done,包装的coroutineis的结果返回(或者如果协程引发异常,则会重新提升。)

如果任务已经cancelled,这个方法引起了CancelledError exception.

如果Task的结果还没有,这个方法引起了InvalidStateError例外。

exception

返回Task的例外.

如果包装的协程引发异常,则返回异常。如果包装的协程正常返回,则此方法返回None.

如果任务已经cancelled,则此方法会引发CancelledError例外

如果任务还没有done,这个方法提出了InvalidStateError异常

add_done_callback// (callback, *, context=None )

添加一个回调,当任务为done.

此方法仅应用于基于低级回调的代码.

有关详细信息,请参阅Future.add_done_callback()的文档.

remove_done_callback (callback)

从回调列表中删除callback.

此方法仅应用于基于低级回调的代码.

有关详细信息,请参阅Future.remove_done_callback()的文档.

get_stack (*, limit=None)

返回此任务的堆栈帧列表.

如果未完成包装协程,则会返回挂起的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程被异常终止,则返回回溯帧列表.

框架总是从最旧到最新排序.

只有一个堆叠框架可用于悬挂的协程

可选limit参数设置framesto返回的最大数量;默认情况下,返回所有可用的帧。返回列表的排序因whethera堆栈或返回的追溯而有所不同:返回最新的astack帧,但返回最旧的追溯帧。(这与回溯模块的行为相匹配。)

print_stack*, limit=None, file=None

打印此任务的堆栈或回溯.

对于由get_stack().

limit参数传递给get_stack()直接

file参数是输出写入的I / O流;默认输出写入sys.stderr.

classmethod all_tasksloop=None

返回一组事件循环的所有任务.

默认情况下,返回当前事件循环的所有任务。如果loopNoneget_event_loop()function用于获取当前循环

这个方法是弃用并将在Python 3.9中删除。使用asyncio.all_tasks()功能代替

classmethod current_taskloop=None)

返回当前正在运行的任务或None.

如果loopNoneget_event_loop()函数用于获取当前循环

这个方法是弃用并将被删除inPython3.9。使用asyncio.current_task() functioninstead.

基于生成器协同程序

注意

支持基于生成器的协程弃用 andis计划在Python 3.10中删除.

Generator-based coroutines preate async / await语法。它们是使用yield from表达式来awaiton Futures和其他协程的Python生成器.

基于生成器的协同程序应该用@asyncio.coroutine修饰,虽然没有强制执行.

@asyncio.coroutine

Decorator标记基于生成器的协同程序.

这个装饰器使传统的基于生成器的协同程序与async / await代码兼容:

@asyncio.coroutinedef old_style_coroutine():    yield from asyncio.sleep(1)async def main():    await old_style_coroutine()

这个装饰器是弃用并计划在Python 3.10.

这个装饰器不应该用于async defcoroutines.

asyncio.iscoroutineobj

返回True如果obj协程对象.

这个方法不同于inspect.iscoroutine()因为基于发电机的协同程序返回True.

asyncio.iscoroutinefunction(func)

返回True如果func协程函数.

这个方法与inspect.iscoroutinefunction()不同,因为它返回True用于基于生成器的协程函数@coroutine.