返回介绍

18.1 线程与协程对比

发布于 2024-02-05 21:59:47 字数 11266 浏览 0 评论 0 收藏 0

有一次讨论线程和 GIL 时,Michele Simionato 发布了一个简单但有趣的示例:在长时间计算的过程中,使用 multiprocessing 包在控制台中显示一个由 ASCII 字符 "|/-\" 构成的动画旋转指针。

我改写了 Simionato 的示例,一个借由 threading 模块使用线程实现,一个借由 asyncio 包使用协程实现。我这么做是为了让你对比两种实现,理解如何不使用线程来实现并发行为。

示例 18-1 和示例 18-2 的输出是动态的,因此你一定要运行这两个脚本,看看结果如何。如果你在坐地铁(或者在某个没有 Wi-Fi 连接的地方),可以看图 18-1,想象单词“thinking”之前的 \ 线是旋转的。

图 18-1:spinner_thread.py 和 spinner_asyncio.py 两个脚本的输出类似:旋转指针对象的字符串表示形式和文本“Answer: 42”。在这个截图中,spinner_asyncio.py 脚本仍在运行中,旋转指针显示的是“\ thinking!”消息;脚本运行结束后,那一行会替换成“Answer: 42”

首先,分析 spinner_thread.py 脚本(见示例 18-1)。

示例 18-1 spinner_thread.py:通过线程以动画形式显示文本式旋转指针

import threading
import itertools
import time
import sys


class Signal:  ➊
  go = True


def spin(msg, signal):  ➋
  write, flush = sys.stdout.write, sys.stdout.flush
  for char in itertools.cycle('|/-\\'):  ➌
    status = char + ' ' + msg
    write(status)
    flush()
    write('\x08' * len(status))  ➍
    time.sleep(.1)
    if not signal.go:  ➎
      break
  write(' ' * len(status) + '\x08' * len(status))  ➏


def slow_function():  ➐
  # 假装等待I/O一段时间
  time.sleep(3)  ➑
  return 42


def supervisor():  ➒
  signal = Signal()
  spinner = threading.Thread(target=spin,
                 args=('thinking!', signal))
  print('spinner object:', spinner)  ➓
  spinner.start()  ⓫
  result = slow_function()  ⓬
  signal.go = False  ⓭
  spinner.join()  ⓮
  return result


def main():
  result = supervisor()  ⓯
  print('Answer:', result)


if __name__ == '__main__':
  main()

❶ 这个类定义一个简单的可变对象;其中有个 go 属性,用于从外部控制线程。

❷ 这个函数会在单独的线程中运行。signal 参数是前面定义的 Signal 类的实例。

❸ 这其实是个无限循环,因为 itertools.cycle 函数会从指定的序列中反复不断地生成元素。

❹ 这是显示文本式动画的诀窍所在:使用退格符(\x08)把光标移回来。

❺ 如果 go 属性的值不是 True 了,那就退出循环。

❻ 使用空格清除状态消息,把光标移回开头。

❼ 假设这是耗时的计算。

❽ 调用 sleep 函数会阻塞主线程,不过一定要这么做,以便释放 GIL,创建从属线程。

❾ 这个函数设置从属线程,显示线程对象,运行耗时的计算,最后杀死线程。

❿ 显示从属线程对象。输出类似于 <Thread(Thread-1, initial)>。

⓫ 启动从属线程。

⓬ 运行 slow_function 函数,阻塞主线程。同时,从属线程以动画形式显示旋转指针。

⓭ 改变 signal 的状态;这会终止 spin 函数中的那个 for 循环。

⓮ 等待 spinner 线程结束。

⓯ 运行 supervisor 函数。

注意,Python 没有提供终止线程的 API,这是有意为之的。若想关闭线程,必须给线程发送消息。这里,我使用的是 signal.go 属性:在主线程中把它设为 False 后,spinner 线程最终会注意到,然后干净地退出。

下面来看如何使用 @asyncio.coroutine 装饰器替代线程,实现相同的行为。

 第 16 章的小结说过,asyncio 包使用的“协程”是较严格的定义。适合 asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield。此外,适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用;或者把协程传给 asyncio 包中的某个函数,例如 asyncio.async(...) 和本章要介绍的其他函数,从而驱动协程。最后,@asyncio.coroutine 装饰器应该应用在协程上,如下述示例所示。

我们来分析示例 18-2。

示例 18-2 spinner_asyncio.py:通过协程以动画形式显示文本式旋转指针

import asyncio
import itertools
import sys


@asyncio.coroutine  ➊
def spin(msg):  ➋
  write, flush = sys.stdout.write, sys.stdout.flush
  for char in itertools.cycle('|/-\\'):
    status = char + ' ' + msg
    write(status)
    flush()
    write('\x08' * len(status))
    try:
      yield from asyncio.sleep(.1)  ➌
    except asyncio.CancelledError:  ➍
      break
  write(' ' * len(status) + '\x08' * len(status))


@asyncio.coroutine
def slow_function():  ➎
  # 假装等待I/O一段时间
  yield from asyncio.sleep(3)  ➏
  return 42


@asyncio.coroutine
def supervisor():  ➐
  spinner = asyncio.async(spin('thinking!'))  ➑
  print('spinner object:', spinner)  ➒
  result = yield from slow_function()  ➓
  spinner.cancel()  ⓫
  return result


def main():
  loop = asyncio.get_event_loop()  ⓬
  result = loop.run_until_complete(supervisor())  ⓭
  loop.close()
  print('Answer:', result)


if __name__ == '__main__':
  main()

❶ 打算交给 asyncio 处理的协程要使用 @asyncio.coroutine 装饰。这不是强制要求,但是强烈建议这么做。原因在本列表后面。

❷ 这里不需要示例 18-1 中 spin 函数中用来关闭线程的 signal 参数。

❸ 使用 yield from asyncio.sleep(.1) 代替 time.sleep(.1),这样的休眠不会阻塞事件循环。

❹ 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求,因此退出循环。

❺ 现在,slow_function 函数是协程,在用休眠假装进行 I/O 操作时,使用 yield from 继续执行事件循环。

❻ yield from asyncio.sleep(3) 表达式把控制权交给主循环,在休眠结束后恢复这个协程。

❼ 现在,supervisor 函数也是协程,因此可以使用 yield from 驱动 slow_function 函数。

❽ asyncio.async(...) 函数排定 spin 协程的运行时间,使用一个 Task 对象包装 spin 协程,并立即返回。

❾ 显示 Task 对象。输出类似于 <Task pending coro=<spin() running at spinner_ asyncio.py:12>>。

❿ 驱动 slow_function() 函数。结束后,获取返回值。同时,事件循环继续运行,因为 slow_function 函数最后使用 yield from asyncio.sleep(3) 表达式把控制权交回给了主循环。

⓫ Task 对象可以取消;取消后会在协程当前暂停的 yield 处抛出 asyncio.CancelledError 异常。协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消。

⓬ 获取事件循环的引用。

⓭ 驱动 supervisor 协程,让它运行完毕;这个协程的返回值是这次调用的返回值。

 除非想阻塞主线程,从而冻结事件循环或整个应用,否则不要在 asyncio 协程中使用 time.sleep(...)。如果协程需要在一段时间内什么也不做,应该使用 yield from asyncio.sleep(DELAY)。

使用 @asyncio.coroutine 装饰器不是强制要求,但是强烈建议这么做,因为这样能在一众普通的函数中把协程凸显出来,也有助于调试:如果还没从中产出值,协程就被垃圾回收了(意味着有操作未完成,因此有可能是个缺陷),那就可以发出警告。这个装饰器不会预激协程

注意,spinner_thread.py 和 spinner_asyncio.py 两个脚本的代码行数差不多。supervisor 函数是这两个示例的核心。下面详细对比二者。示例 18-3 只列出了线程版示例中的 supervisor 函数。

示例 18-3 spinner_thread.py:线程版 supervisor 函数

def supervisor():
  signal = Signal()
  spinner = threading.Thread(target=spin,
                 args=('thinking!', signal))
  print('spinner object:', spinner)
  spinner.start()
  result = slow_function()
  signal.go = False
  spinner.join()
  return result

为了对比,示例 18-4 列出了 supervisor 协程。

示例 18-4 spinner_asyncio.py:异步版 supervisor 协程

@asyncio.coroutine
def supervisor():
  spinner = asyncio.async(spin('thinking!'))
  print('spinner object:', spinner)
  result = yield from slow_function()
  spinner.cancel()
  return result

这两种 supervisor 实现之间的主要区别概述如下。

asyncio.Task 对象差不多与 threading.Thread 对象等效。Victor Stinner(本章的特约技术审校)指出,“Task 对象像是实现协作式多任务的库(例如 gevent)中的绿色线程(green thread)”。

Task 对象用于驱动协程,Thread 对象用于调用可调用的对象。

Task 对象不由自己动手实例化,而是通过把协程传给 asyncio.async(...) 函数或 loop.create_task(...) 方法获取。

获取的 Task 对象已经排定了运行时间(例如,由 asyncio.async 函数排定);Thread 实例则必须调用 start 方法,明确告知让它运行。

在线程版 supervisor 函数中,slow_function 函数是普通的函数,直接由线程调用。在异步版 supervisor 函数中,slow_function 函数是协程,由 yield from 驱动。

没有 API 能从外部终止线程,因为线程随时可能被中断,导致系统处于无效状态。如果想终止任务,可以使用 Task.cancel() 实例方法,在协程内部抛出 CancelledError 异常。协程可以在暂停的 yield 处捕获这个异常,处理终止请求。

supervisor 协程必须在 main 函数中由 loop.run_until_complete 方法执行。

上述比较应该能帮助你理解,与更熟悉的 threading 模型相比,asyncio 是如何编排并发作业的。

线程与协程之间的比较还有最后一点要说明:如果使用线程做过重要的编程,你就知道写出程序有多么困难,因为调度程序任何时候都能中断线程。必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态。

而协程默认会做好全方位保护,以防止中断。我们必须显式产出才能让程序的余下部分运行。对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只有一个协程运行。想交出控制权时,可以使用 yield 或 yield from 把控制权交还调度程序。这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield 处取消,因此可以处理 CancelledError 异常,执行清理操作。

下面说明 asyncio.Future 类与第 17 章所用的 concurrent.futures.Future 类之间的区别。

18.1.1 asyncio.Future:故意不阻塞

asyncio.Future 类与 concurrent.futures.Future 类的接口基本一致,不过实现方式不同,不可以互换。“PEP 3156—Asynchronous IO Support Rebooted: the‘asyncio’Module”对这个不幸状况是这样说的:

未来可能会统一 asyncio.Future 和 concurrent.futures.Future 类实现的期物(例如,为后者添加兼容 yield from 的 __iter__ 方法)。

如 17.1.3 节所述,期物只是调度执行某物的结果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task 实例——也是 asyncio.Future 类的实例,因为 Task 是 Future 的子类,用于包装协程。这与调用 Executor.submit(...) 方法创建 concurrent.futures.Future 实例是一个道理。

与 concurrent.futures.Future 类似,asyncio.Future 类也提供了 .done()、.add_done_callback(...) 和 .result() 等方法。前两个方法的用法与 17.1.3 节所述的一样,不过 .result() 方法差别很大。

asyncio.Future 类的 .result() 方法没有参数,因此不能指定超时时间。此外,如果调用 .result() 方法时期物还没运行完毕,那么 .result() 方法不会阻塞去等待结果,而是抛出 asyncio.InvalidStateError 异常。

然而,获取 asyncio.Future 对象的结果通常使用 yield from,从中产出结果,如示例 18-8 所示。

使用 yield from 处理期物,等待期物运行完毕这一步无需我们关心,而且不会阻塞事件循环,因为在 asyncio 包中,yield from 的作用是把控制权还给事件循环。

注意,使用 yield from 处理期物与使用 add_done_callback 方法处理协程的作用一样:延迟的操作结束后,事件循环不会触发回调对象,而是设置期物的返回值;而 yield from 表达式则在暂停的协程中生成返回值,恢复执行协程。

总之,因为 asyncio.Future 类的目的是与 yield from 一起使用,所以通常不需要使用以下方法。

无需调用 my_future.add_done_callback(...),因为可以直接把想在期物运行结束后执行的操作放在协程中 yield from my_future 表达式的后面。这是协程的一大优势:协程是可以暂停和恢复的函数。

无需调用 my_future.result(),因为 yield from 从期物中产出的值就是结果(例如,result = yield from my_future)。

当然,有时也需要使用 .done()、.add_done_callback(...) 和 .result() 方法。但是一般情况下,asyncio.Future 对象由 yield from 驱动,而不是靠调用这些方法驱动。

下面分析 yield from 和 asyncio 包的 API 如何拉近期物、任务和协程的关系。

18.1.2 从期物、任务和协程中产出

在 asyncio 包中,期物和协程关系紧密,因为可以使用 yield from 从 asyncio.Future 对象中产出结果。这意味着,如果 foo 是协程函数(调用后返回协程对象),抑或是返回 Future 或 Task 实例的普通函数,那么可以这样写:res = yield from foo()。这是 asyncio 包的 API 中很多地方可以互换协程与期物的原因之一。

为了执行这些操作,必须排定协程的运行时间,然后使用 asyncio.Task 对象包装协程。对协程来说,获取 Task 对象有两种主要方式。

asyncio.async(coro_or_future, *, loop=None)

这个函数统一了协程和期物:第一个参数可以是二者中的任何一个。如果是 Future 或 Task 对象,那就原封不动地返回。如果是协程,那么 async 函数会调用 loop.create_task(...) 方法创建 Task 对象。loop= 关键字参数是可选的,用于传入事件循环;如果没有传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数获取循环对象。

BaseEventLoop.create_task(coro)

这个方法排定协程的执行时间,返回一个 asyncio.Task 对象。如果在自定义的 BaseEventLoop 子类上调用,返回的对象可能是外部库(如 Tornado)中与 Task 类兼容的某个类的实例。

 BaseEventLoop.create_task(...) 方法只在 Python 3.4.2 及以上版本中可用。如果是 Python 3.3 或 Python 3.4 的旧版,要使用 asyncio.async(...) 函数,或者从 PyPI 中安装较新的 asyncio 版本

asyncio 包中有多个函数会自动(内部使用的是 asyncio.async 函数)把参数指定的协程包装在 asyncio.Task 对象中,例如 BaseEventLoop.run_until_complete(...) 方法。

如果想在 Python 控制台或者小型测试脚本中试验期物和协程,可以使用下述代码片段:3

3摘自 Petr Viktorin 于 2014 年 9 月 11 日在 Python-ideas 邮件列表中发布的消息

  >>> import asyncio
  >>> def run_sync(coro_or_future):
  ...   loop = asyncio.get_event_loop()
  ...   return loop.run_until_complete(coro_or_future)
  ...
  >>> a = run_sync(some_coroutine())

在 asyncio 包的文档中,“18.5.3. Tasks and coroutines”一节说明了协程、期物和任务之间的关系。其中有个注解说道:

这份文档把一些方法说成是协程,即使它们其实是返回 Future 对象的普通 Python 函数。这是故意的,为的是给以后修改这些函数的实现留下余地。

掌握这些基础知识后,接下来要分析异步下载国旗的 flags_asyncio.py 脚本。这个脚本的用法在示例 17-1(第 17 章)中与依序下载版和线程池版一同演示过。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文