Advertisement

Python 高并发 协程与任务— Python 3.7 版本(与3.6版本有区别)

阅读量:

协程与任务

本节将简述用于协程与任务的高层级 API。

协程

支持 await 的对象

启动 asyncio 程序

该链接位于Python官方文档中指定位置,并用于创建 asyncio 作业

休眠

并行执行的任务

阻止撤销取消操作

超时

简单的等待

源自其他线程的时间表

自我审视(https://docs.python.org/zh-cn/3.7/library/asyncio-task.html#introspection) 是一种用于开发与实现 asyncio 模块中 introspection 功能的技术方法

Task 对象

基于生成器的协程

协程

使用 async/await 语法进行定义是编写异步应用的理想方案。
例如以下代码段(需Python3.7及以上)输出‘hello’后等待一秒并随后输出‘world’:

复制代码

注意:简单地调用一个协程并不会将其加入执行日程:

复制代码

要真正运行一个协程,asyncio 提供了三种主要机制:

[asyncio.run()] 函数被设计为负责处理所有异步任务的核心函数(如前所述,在示例中进行了详细说明)。它通过调用 asyncio.run() 方法来启动并协调各个 async 任务序列的核心逻辑代码块(如前所述,在示例中进行了详细说明)。

该代码段将在等待1秒后输出'hello';随后再次阻塞2秒后输出'world'

复制代码

预期的输出:

复制代码

asyncio.create_task() 函数的作用是用于同时执行作为 asyncio 任务中的多个协程。

让我们修改以上示例, 并发 运行两个 say_after 协程:

复制代码

注意,预期的输出显示代码段的运行时间比之前快了 1 秒:

复制代码

可等待对象

如果一个实体可以在 await 代码块中使用,则被称为 可期待 实体。 asyncio接口通常被构造为能够处理可期待的任务。

可等待 对象有三种主要类型: 协程 , 任务Future.

协程

Python 协程属于 可等待 对象,因此可以在其他协程中被等待:

复制代码

重要

在本文档中 "协程" 可用来表示两个紧密关联的概念:

异步函数 : 其具体实现形式为 async def 的实体;

协程对象 : 调用 协程函数 所返回的对象。

asyncio 也提供传统的基于 generator 的协程功能

任务

任务 被用来设置日程以便 并发 执行协程。

当某个协程通过asyncio.create_task()等函数被整合成一项任务时,在创建后会被安排在日程表中执行,并立即启动

复制代码

Futures

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果

当一个 Future 对象 处于等待状态时 ,这意味着协程将保持等待直到该 Future 实例完成其在其他地方的操作。

在 asyncio框架中使用async/await时必须借助wait对象来实现基于回调的代码执行

通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:

复制代码

一种优质的返回机制用于低层操作的一个示例是 loop.run_in_executor() 链接到Python官方文档。

运行 asyncio 程序

asyncio.``run(coro , * , debug=False)

运行 coroutine coro 并返回其结果

此函数运行传入的协程,负责管理 asyncio 事件循环并 完结异步生成器

当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。

如果 debugTrue,事件循环将以调试模式运行。

此函数始终创建一个新的事件循环,并在终止时关闭之。 asyncio 程序应该以此函数作为其主入口点,在大多数情况下应仅调用一次。

示例:

复制代码

关键

关键

示例

需要改写的内容

创建任务

asyncio.``create_task(coro)

用于将 coro 协会(协程)整合为一个 Task 安排在日程中待命执行。返回该 Task 实例。

该任务位于 asyncio.get_running_loop() 方法返回的事件循环中执行;若当前线程不在运行中的 asyncio 循环中,则会触发 RuntimeError。

该函数 于Python 3.7版本中被发布 。而在Python 3.7版本之前,则可采用更底层的[asyncio.ensure_future()]函数作为替代方案。

复制代码

3.7 新版功能.

休眠

coroutineasyncio.``sleep(delay , result=None , * , loop=None)

阻塞 delay 指定的秒数。

如果指定了 result ,则当协程完成时将其返回给调用者。

sleep() 总是会挂起当前任务,以允许其他任务运行。

loop 参数已弃用,计划在 Python 3.10 中移除。

以下协程示例运行 5 秒,每秒显示一次当前日期:

复制代码

并发运行任务

awaitableasyncio.``gather(*aws , loop=None , return_exceptions=False)

并行执行Aws序列中的可等待对象

如果 aws 中的某个可等待对象为协程,它将自动作为一个任务加入日程。

如果所有可等待对象全部完成,则会生成一个由返回值并汇总生成的列表。其结果值按照 aws 中各可等待对象出现的顺序排列。

  • 如果 return_exceptions 被指定为 False(默认情况下),则首次发生的异常会被立即传播给等待 gather() 的任务。
  • AWS 表达式序列中定义的其他可等待对象将不会被取消并将继续运行。

return_exceptions 设置为 True 时,系统不仅会捕获异常事件本身的数据信息,并且还会将这些异常信息与其他成功的操作数据一起整合到最终的结果集合中。

如果 gather() 被取消 ,所有被提交 (尚未完成) 的可等待对象也会 被取消

如果 aws 序列中的任意一个 Task 或 Future 对象被取消,则会被视为引发了 CancelledError 的一样处理——在这种情况下 gather() 调用 不会 被取消。这种设计是为了确保一旦某个已提交的 Task/Future 被取消就不会触发其他未完成的任务或未来对象也被取消。

示例:

复制代码

在 3.7 版本中对_gather_进行取消操作时,不论_rerun_exceptions_设置为任何值,消息都会被传播。

屏蔽取消操作

awaitableasyncio.``shield(aw , * , loop=None)

确保一个待处理任务不被提前取消

如果 aw 是一个协程,它将自动作为任务加入日程。

以下语句:

复制代码

相当于:

复制代码

主要区别在于:如果包含它的协程被取消时,在 something() 中运行的任务并不会受到影响。从 something() 视角下观察时,并未发生此操作。但调用者已被取消执行,则会导致 "await" 表达式仍会触发 CancelledError

如果 shield() 的取消依赖于某个特定的操作(即该操作在其内部执行)则 something() 的取消也将随之触发

若想完全忽略取消操作(不建议)的情况下,则 shield() 函数必须配合一个 try-catch 代码块进行处理,请参考以下示例。

复制代码

超时

coroutineasyncio.``wait_for(aw , timeout , * , loop=None)

一直 await 可 await的对象 完成,并返回结果;若指定超时时间后不再返回结果

如果 aw 是一个协程,它将自动作为任务加入日程。

timeout 可取值为 None、float 类型或 int 类型数值表示的时间间隔(秒)。若 timeout 未指定,则程序会一直等待任务完成。

一旦出现超时情况, 任务将会被取消, 并且会触发 asyncio.TimeoutError 异常.

应避免对任务取消(即Task.cancel)进行操作,并建议在需要时使用shield()来保护任务不被意外取消。

在目标对象成功取消之前执行该函数操作可能会导致总等待时间…这可能导致总等待时间…因此,在调用此函数时可能会遇到长时间的等待情况。

如果等待被取消,则 aw 指定的对象也会被取消。

loop 参数已弃用,计划在 Python 3.10 中移除。

示例:

复制代码

自 Python 3.7 版本起,在处理超时事件时

当某个指定的异常(如 _aw)发生时

这些旧版本可能会直接抛出 asyncio.TimeoutError 异常

简单等待

coroutine`.await asyncio.wait(_aws_instance, * , loop_or_current=_loop_or_current, timeout=_timeout, returned_completions=_return_when)

同时启动 aws 配置为 可等待对象 的异步任务,并在满足 return_when 指定的时间点后自动终止主线程

aws 中的某个等待对象被配置为协程时,该请求会被立即分配到任务队列中。由于直接向 wait() 方法传递协程对象已被弃用且可能导致令人困惑的行为。

返回两个 Task/Future 集合: (done, pending)

用法:

复制代码

loop 参数已弃用,计划在 Python 3.10 中移除。

如设置为 整数或浮点数类型,则该变量将被用于调节返回前等待的时间长度。

请确保该函数不会触发 asyncio.TimeoutError 错误。一旦发生超时事件,在预定时间后会终止未完成的 Future 或 Task。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

wait_for() 相比不同的是,在超时发生时 wait() 不会取消任何可等待对象。

注解

触发wait()后会立即把协程作为任务加入待处理列表;而后续操作中会被直接返回一个由已完成和未完成任务组成的集合。由此可见,在这段代码中并不会实现预期的功能。

复制代码

以上代码段的修正方法如下:

复制代码

直接向 wait() 传入协程对象的方式已弃用。

asyncio.``as_completed(aws , * , loop=None , timeout=None)

同时执行 AWS 集合中的 awaitable 对象(可等待对象)。该方法返回一个 Future 对象的迭代器。每个 Futures 对象表示剩余可等待对象集合中最先完成的结果。

若所有 Future 对象在完成之前发生超时,则会导致 asyncio.TimeoutError(参见 Python 官方文档)。

示例:

复制代码

源自其他进程的时间表

asyncio.``run_coroutine_threadsafe(coro , loop)

向指定事件循环提交一个协程。线程安全。

请返回一个 concurrent.futures.Future 实例,并为此实例等待以便其他操作系统的线程获取结果

此函数应该从另一个 OS 线程中调用,而非事件循环运行所在线程。示例:

复制代码

当协程内部发生异常时, 将会触发返回给调用者的Future事件。此外还可以用于中断正在执行的任务流程

复制代码

请参考 concurrency and multithreading 详细内容

不同与其他 asyncio 函数,此函数要求显式地传入 loop 参数。

3.5.1 新版功能.

内省

asyncio.``current_task(loop=None)

获取现在正在运行的 [Task](https://docs.python.org/zh-cn/3.7/library/asyncio-task.html#asyncio.Task) 实例;如果未有正在运行的任务,则返回 None

loop 等于 None 时,则会调用 get_running_loop() 函数以获取当前事件循环

3.7 新版功能.

asyncio.``all_tasks(loop=None)

在事件循环中等待处理的任务集合

loop 设置为 None时,则会通过调用 get_running_loop() 来获取当前事件循环。

3.7 新版功能.

Task 对象

classasyncio.``Task(coro , * , loop=None)

类似于Future的对象,在Python中支持运行协程的功能,并且是非线程安全的。

Task 用于在事件循环中执行协程。当一个协程等待一个 Future 完成时,在这个过程中 Task 会暂停该协程的执行并等待 Future 的完成。一旦 Future 完成任务后就会打包相关的协程重新开始运行。

采用协同日程调度机制的事件循环: 每个事件循环依次执行一个Task对象。每个Task对象处于等待其 associated Future对象完成的状态,并促使该事件循环依次执行其他Task、回调或进行IO操作。

推荐采用高级别接口[asyncio.create_task()]来实现任务管理功能。该库还提供了底层接口[loop.create_task()]供开发者直接操作事件循环机制,并支持通过[ensure_future()]函数快速构造未来任务对象以简化流程。需要注意的是,默认情况下无需自行创建Task对象以避免潜在问题

可通过调用 cancel() 方法来取消一个正在运行中的 Task 对象,并将导致该 Task 对象抛出一个 CancelledError 异常并传递给打包的协程。如果在取消过程中有一个协程等待某个 Future 对象,则该 Future 对象也会被取消。

cancelled() 可以用作判断 Task 对象是否已被取消的一种手段。当打包的协程未阻止 CancelledError 异常并实际发生取消时,则该方法会返回 True。

asyncio.Task源自 Future的所有 API功能,并不包括 set_result()set_exception()这两个方法。

Task 对象集成 contextvars 模块(链接:https://docs.python.org/zh-cn/3.7/library/contextvars.html#module-contextvars)

在 3.7 版本更新中新增了对 Python 官方文档中 contextvars 模块的支持

cancel()

请求取消 Task 对象。

将在由下一轮事件循环抛出一个 CancelledError 异常事件,并触发到被封包的协程。

协程在之后有机会进行清理甚至借助 try...except...finally 这种代码块来抑制异常以拒绝请求。与 Future.cancel() 的不同之处在于 它不保证取消 即使偶尔出现完全被取消的情况 也应当避免这种做法。

以下示例演示了协程是如何侦听取消请求的:

复制代码

cancelled()

如果 Task 对象 被取消 则返回 True

当发起 cancel() 操作时,在此情况下 Task 将被触发取消请求。其关联的任务将传播被触发的 CancelledError 异常信息。

done()

如果 Task 对象 已完成 则返回 True

一旦协程返回值、引发异常或自身被取消,则被视为 已完成

result()

返回 Task 的结果。

每当 Task 对象 已完成 ,其封包中的协程将执行完毕并返回结果(若协程发生错误,则错误会被重新抛出)。

当 Task 对象 被终止 时 ,此方法将触发一个 CancelledError 错误 。

当 Task 对象的结果无法使用时, 该方法将触发一个 InvalidStateError 异常

exception()

返回 Task 对象的异常。

如果一个包裹中的协程导致了一个错误,则该错误会被返回。反之,若该包裹中的协程顺利完成运行,则该方法将不会返回 None

当 Task 对象 被取消 执行时 ,该方法将导致指定的 CancelledError 异常发生。

如果该Task实例还未进入最终确认阶段,并且此方法调用后将导致系统处于一个不可逆的InvalidStateError异常状态。

add_done_callback(callback , * , context=None)

添加一个回调,将在 Task 对象 完成 时被运行。

此方法应该仅在低层级的基于回调的代码中使用。

如需更深入了解相关内容,请查阅 Future.add_done_callback() 的官方文档。

remove_done_callback(callback)

从回调列表中移除 callback

此方法应该仅在低层级的基于回调的代码中使用。

如需获取更多详细信息,请参考代码块中的Future.remove_done_callback()及其相关文档链接。

get_stack(* , limit=None)

返回此 Task 对象的栈框架列表。

如果封装的 coroutine 未能完成,则会返回其阻塞位置。当 coroutine 成功结束或被手动终止时,则会返回空列表。若 coroutine 发生错误后终止,则会触发 Intrusive Programming List (IPL)。

框架总是从按从旧到新排序。

每个被挂起的协程只返回一个栈框架。

可选参数 limit 设定结果集的最大数量限制,默认情况下会提取全部组件。查询结果的顺序取决于是采用栈机制还是回溯机制:使用栈时会列出最新的组件;而使用回溯则会列出最早的组件。这与 traceback 模块的行为一致。

print_stack(* , limit=None , file=None)

打印此 Task 对象的栈或回溯。

该方法生成的输出相当于使用 get_stack() 指令借助于 asyncio 模块所得到的框架结构。

limit 参数会被直接传递给[get_stack()]

文件参数用于表示被输出的输入/输出流;在不指定的情况下,默认会将输出写入到sys.stderr

classmethodall_tasks(loop=None)

返回一个事件循环中所有任务的集合。

默认情况下, 程序会返回现有的事件循环中的所有任务. 当 loop 未指定时, 默认调用 get_event_loop() 来获取现有事件循环.

该方法已被废弃,并将自Python 3.9版本起移除。建议采用 asyncio.all_tasks()函数,并提供以下功能:在多线程环境中同步执行多个 asyncio任务并返回所有结果列表。此功能旨在提高任务处理效率。

classmethodcurrent_task(loop=None)

返回当前运行任务或 None

loop 未指定时,则会调用 get_event_loop() 函数以获取当前事件循环

此方法已被废弃,并将在Python 3.9版本中被移除。建议采用asyncio.current_task()函数进行相关操作。

主要依赖于基于生成器实现的协程

注解

对基于生成器的协程的支持 已弃用 并计划在 Python 3.10 中移除。

基于生成器的协程是 async/await 语法的基础。通过 yield from 语句实现生成的 Python 生成器能够等待 Future 和其它协程。

对于基于生成器编写的协程来说,在大多数情况下最好采用 @asyncio.coroutine 进行装饰,并非这是一个强制性的要求

@``asyncio.``coroutine

用来标记基于生成器的协程的装饰器。

此装饰器使得旧式的基于生成器的协程能与 async/await 代码相兼容:

复制代码

此装饰器 已弃用 并计划觉得 Python 3.10 中移除。

此装饰器不应应用于 async def 协程

asyncio.``iscoroutine(obj)

obj 属于 协程对象 时,则返回 True

此方法不支持使用 inspect.iscoroutine() 的语法结构,并且由于其能够准确判断基于生成器的协程是否为 coroutine 而返回 True。

asyncio.``iscoroutinefunction(func)

func 被识别为 协程函数 时,则返回 True

该方法与 inspect.iscoroutinefunction() 不同之处在于其能够识别并返回True的基于生成器的协程函数

全部评论 (0)

还没有任何评论哟~