返回介绍

18.4 改进 asyncio 下载脚本

发布于 2024-02-05 21:59:47 字数 10726 浏览 0 评论 0 收藏 0

17.5 节说过,flags2 系列示例的命令行接口相同。本节要分析这个系列中的 flags2_asyncio.py 脚本。例如,示例 18-6 展示如何使用 100 个并发请求(-m 100)从 ERROR 服务器中下载 100 面国旗(-al 100)。

示例 18-6 运行 flags2_asyncio.py 脚本

$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100
ERROR site: http://localhost:8003/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
--------------------
73 flags downloaded.
27 errors.
Elapsed time: 0.64s

 测试并发客户端要谨慎

尽管线程版和 asyncio 版 HTTP 客户端的下载总时间相差无几,但是 asyncio 版发送请求的速度更快,因此很有可能对服务器发起 DoS 攻击。为了全速测试这些并发客户端,应该在本地搭建 HTTP 服务器,详情参见本书代码仓库中 17-futures/countries/ 目录里的 README.rst 文件

下面分析 flags2_asyncio.py 脚本的实现方式。

18.4.1 使用asyncio.as_completed函数

在示例 18-5 中,我把一个协程列表传给 asyncio.wait 函数,经由 loop.run_until_complete 方法驱动,全部协程运行完毕后,这个函数会返回所有下载结果。可是,为了更新进度条,各个协程运行结束后就要立即获取结果。在线程池版示例中(见示例 17-14),为了集成进度条,我们使用的是 as_completed 生成器函数;幸好,asyncio 包提供了这个生成器函数的相应版本。

为了使用 asyncio 包实现 flags2 示例,我们要重写几个函数;重写后的函数可以供 concurrent.future 版重用。之所以要重写,是因为在使用 asyncio 包的程序中只有一个主线程,而在这个线程中不能有阻塞型调用,因为事件循环也在这个线程中运行。所以,我要重写 get_flag 函数,使用 yield from 访问网络。现在,由于 get_flag 是协程, download_one 函数必须使用 yield from 驱动它,因此 download_one 自己也要变成协程。之前,在示例 18-5 中,download_one 由 download_many 驱动:download_one 函数由 asyncio. wait 函数调用,然后传给 loop.run_until_complete 方法。现在,为了报告进度并处理错误,我们要更精确地控制,所以我把 download_many 函数中的大多数逻辑移到一个新的协程 downloader_coro 中,只在 download_many 函数中设置事件循环,以及调度 downloader_coro 协程。

示例 18-7 展示的是 flags2_asyncio.py 脚本的前半部分,定义 get_flag 和 download_one 协程。示例 18-8 列出余下的源码,定义 downloader_coro 协程和 download_many 函数。

示例 18-7 flags2_asyncio.py:脚本的前半部分;余下的代码在示例 18-8 中

import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm

from flags2_common import main, HTTPStatus, Result, save_flag

# 默认设为较小的值,防止远程网站出错
# 例如503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000


class FetchError(Exception):  ➊
  def __init__(self, country_code):
    self.country_code = country_code


@asyncio.coroutine
def get_flag(base_url, cc): ➋
  url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
  resp = yield from aiohttp.request('GET', url)
  if resp.status == 200:
    image = yield from resp.read()
    return image
  elif resp.status == 404:
    raise web.HTTPNotFound()
  else:
    raise aiohttp.HttpProcessingError(
      code=resp.status, message=resp.reason,
      headers=resp.headers)


@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):  ➌
  try:
    with (yield from semaphore):  ➍
      image = yield from get_flag(base_url, cc)  ➎
  except web.HTTPNotFound:  ➏
    status = HTTPStatus.not_found
    msg = 'not found'
  except Exception as exc:
    raise FetchError(cc) from exc  ➐
  else:
    save_flag(image, cc.lower() + '.gif')  ➑
    status = HTTPStatus.ok
    msg = 'OK'

  if verbose and msg:
    print(cc, msg)

  return Result(status, cc)

❶ 这个自定义的异常用于包装其他 HTTP 或网络异常,并获取 country_code,以便报告错误。

❷ get_flag 协程有三种返回结果:返回下载得到的图像;HTTP 响应码为 404 时,抛出 web.HTTPNotFound 异常;返回其他 HTTP 状态码时,抛出 aiohttp.HttpProcessingError 异常。

❸ semaphore 参数是 asyncio.Semaphore的实例。Semaphore 类是同步装置,用于限制并发请求数量。

❹ 在 yield from 表达式中把 semaphore 当成上下文管理器使用,防止阻塞整个系统:如果 semaphore 计数器的值是所允许的最大值,只有这个协程会阻塞。

❺ 退出这个 with 语句后,semaphore 计数器的值会递减,解除阻塞可能在等待同一个 semaphore 对象的其他协程实例。

❻ 如果没找到国旗,相应地设置 Result 的状态。

❼ 其他异常当作 FetchError 抛出,传入国家代码,并使用“PEP 3134 — Exception Chaining and Embedded Tracebacks”引入的 raise X from Y 句法链接原来的异常。

❽ 这个函数的作用是把国旗文件保存到硬盘中。

可以看出,与依序下载版相比,示例 18-7 中的 get_flag 和 download_one 函数改动幅度很大,因为现在这两个函数是协程,要使用 yield from 做异步调用。

对于我们分析的这种网络客户端代码来说,一定要使用某种限流机制,防止向服务器发起太多并发请求,因为如果服务器过载,那么系统的整体性能可能会下降。flags2_threadpool.py 脚本(见示例 17-14)限流的方法是,在 download_many 函数中实例化 ThreadPoolExecutor 类时把 max_workers 参数的值设为 concur_req,只在线程池中启动 concur_req 个线程。在 flags2_asyncio.py 脚本中我的做法是,在 downloader_coro 函数中创建一个 asyncio.Semaphore 实例(在后面的示例 18-8 中),然后把它传给示例 18-7 中 download_one 函数的 semaphore 参数。7

7感谢 Guto Maia 指出本书的草稿没有说明 Semaphore 类。

Semaphore 对象维护着一个内部计数器,若在对象上调用 .acquire() 协程方法,计数器则递减;若在对象上调用 .release() 协程方法,计数器则递增。计数器的初始值在实例化 Semaphore 时设定,如 downloader_coro 函数中的这一行所示:

semaphore = asyncio.Semaphore(concur_req)

如果计数器大于零,那么调用 .acquire() 方法不会阻塞;可是,如果计数器为零,那么 .acquire() 方法会阻塞调用这个方法的协程,直到其他协程在同一个 Semaphore 对象上调用 .release() 方法,让计数器递增。在示例 18-7 中,我没有调用 .acquire() 或 .release() 方法,而是在 download_one 函数中的下述代码块中把 semaphore 当作上下文管理器使用:

    with (yield from semaphore):
      image = yield from get_flag(base_url, cc)

这段代码保证,任何时候都不会有超过 concur_req 个 get_flag 协程启动。

现在来分析示例 18-8 中这个脚本余下的代码。注意,download_many 函数中以前的大多数功能现在都在 downloader_coro 协程中。我们必须这么做,因为必须使用 yield from 获取 asyncio.as_completed 函数产出的期物的结果,所以 as_completed 函数必须在协程中调用。可是,我不能直接把 download_many 函数改成协程,因为必须在脚本的最后一行把 download_many 函数传给 flags2_common 模块中定义的 main 函数,可 main 函数的参数不是协程,而是一个普通的函数。因此,我定义了 downloader_coro 协程,让它运行 as_completed 循环。现在,download_many 函数只用于设置事件循环,并把 downloader_coro 协程传给 loop.run_until_complete 方法,调度 downloader_coro。

示例 18-8 flags2_asyncio.py:接续示例 18-7

@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):  ➊
  counter = collections.Counter()
  semaphore = asyncio.Semaphore(concur_req)  ➋
  to_do = [download_one(cc, base_url, semaphore, verbose)
       for cc in sorted(cc_list)]  ➌

  to_do_iter = asyncio.as_completed(to_do)  ➍
  if not verbose:
    to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  ➎
  for future in to_do_iter:  ➏
    try:
      res = yield from future  ➐
    except FetchError as exc:  ➑
      country_code = exc.country_code  ➒
      try:
        error_msg = exc.__cause__.args[0]  ➓
      except IndexError:
        error_msg = exc.__cause__.__class__.__name__  ⓫
      if verbose and error_msg:
        msg = '*** Error for {}: {}'
        print(msg.format(country_code, error_msg))
      status = HTTPStatus.error
    else:
      status = res.status

    counter[status] += 1  ⓬

  return counter  ⓭


def download_many(cc_list, base_url, verbose, concur_req):
  loop = asyncio.get_event_loop()
  coro = downloader_coro(cc_list, base_url, verbose, concur_req)
  counts = loop.run_until_complete(coro)  ⓮
  loop.close()  ⓯
  return counts

if __name__ == '__main__':
  main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

❶ 这个协程的参数与 download_many 函数一样,但是不能直接调用,因为它是协程函数,而不是像 download_many 那样的普通函数。

❷ 创建一个 asyncio.Semaphore 实例,最多允许激活 concur_req 个使用这个计数器的协程。

❸ 多次调用 download_one 协程,创建一个协程对象列表。

❹ 获取一个迭代器,这个迭代器会在期物运行结束后返回期物。

❺ 把迭代器传给 tqdm 函数,显示进度。

❻ 迭代运行结束的期物;这个循环与示例 17-14 中 download_many 函数里的那个十分相似;不同的部分主要是异常处理,因为两个 HTTP 库(requests 和 aiohttp)之间有差异。

❼ 获取 asyncio.Future 对象的结果,最简单的方法是使用 yield from,而不是调用 future.result() 方法。

❽ download_one 函数抛出的各个异常都包装在 FetchError 对象里,并且链接原来的异常。

❾ 从 FetchError 异常中获取错误发生时的国家代码。

❿ 尝试从原来的异常(__cause__)中获取错误消息。

⓫ 如果在原来的异常中找不到错误消息,使用所链接异常的类名作为错误消息。

⓬ 记录结果。

⓭ 与其他脚本一样,返回计数器。

⓮ download_many 函数只是实例化 downloader_coro 协程,然后通过 run_until_complete 方法把它传给事件循环。

⓯ 所有工作做完后,关闭事件循环,返回 counts。

在示例 18-8 中不能像示例 17-14 那样把期物映射到国家代码上,因为 asyncio.as_completed 函数返回的期物与传给 as_completed 函数的期物可能不同。在 asyncio 包内部,我们提供的期物会被替换成生成相同结果的期物。8

8关于这一点的详细讨论,可以阅读我在 python-tulip 讨论组中发起的话题,题为“Which other futures my come out of asyncio.as_completed?”。Guido 回复了,而且深入分析了 as_completed 函数的实现,还说明了 asyncio 包中期物与协程之间的紧密关系。

因为失败时不能以期物为键从字典中获取国家代码,所以我实现了自定义的 FetchError 异常(如示例 18-7 所示)。FetchError 包装网络异常,并关联相应的国家代码,因此在详细模式中报告错误时能显示国家代码。如果没有错误,那么国家代码是 for 循环顶部那个 yield from future 表达式的结果。

我们使用 asyncio 包实现的这个示例与前面的 flags2_threadpool.py 脚本具有相同的功能,这一话题到此结束。接下来,我们要改进 flags2_asyncio.py 脚本,进一步探索 asyncio 包。

在分析示例 18-7 的过程中,我发现 save_flag 函数会执行硬盘 I/O 操作,而这应该异步执行。下一节说明做法。

18.4.2 使用Executor对象,防止阻塞事件循环

Python 社区往往会忽略一个事实——访问本地文件系统会阻塞,想当然地认为这种操作不会受网络访问的高延迟影响(这也极难预料)。与之相比,Node.js 程序员则始终谨记,所有文件系统函数都会阻塞,因为这些函数的签名中指明了要有回调。表 18-1 已经指出,硬盘 I/O 阻塞会浪费几百万个 CPU 周期,而这可能会对应用程序的性能产生重大影响。

在示例 18-7 中,阻塞型函数是 save_flag。在这个脚本的线程版中(见示例 17-14),save_flag 函数会阻塞运行 download_one 函数的线程,但是阻塞的只是众多工作线程中的一个。阻塞型 I/O 调用在背后会释放 GIL,因此另一个线程可以继续。但是在 flags2_asyncio.py 脚本中,save_flag 函数阻塞了客户代码与 asyncio 事件循环共用的唯一线程,因此保存文件时,整个应用程序都会冻结。这个问题的解决方法是,使用事件循环对象的 run_in_executor 方法。

asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行。若想在这个示例中使用这个功能,download_one 协程只有几行代码需要改动,如示例 18-9 所示。

示例 18-9 flags2_asyncio_executor.py:使用默认的 ThreadPoolExecutor 对象运行 save_flag 函数

@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
  try:
    with (yield from semaphore):
      image = yield from get_flag(base_url, cc)
  except web.HTTPNotFound:
    status = HTTPStatus.not_found
    msg = 'not found'
  except Exception as exc:
    raise FetchError(cc) from exc
  else:
    loop = asyncio.get_event_loop()  ➊
    loop.run_in_executor(None,  ➋
        save_flag, image, cc.lower() + '.gif')  ➌
    status = HTTPStatus.ok
    msg = 'OK'

  if verbose and msg:
    print(cc, msg)

  return Result(status, cc)

❶ 获取事件循环对象的引用。

❷ run_in_executor 方法的第一个参数是 Executor 实例;如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例。

❸ 余下的参数是可调用的对象,以及可调用对象的位置参数。

 我测试示例 18-9 时,没有发现改用 run_in_executor 方法保存图像文件后性能有明显变化,因为图像都不大(平均 13KB)。不过,如果编辑 flags2_common.py 脚本中的 save_flag 函数,把各个文件保存的字节数变成原来的 10 倍(只需把 fp.write(img) 改成 fp.write(img*10)),此时便会看到效果。下载的平均字节数变成 130KB 后,使用 run_in_executor 方法的好处就体现出来了。如果下载包含百万像素的图像,速度提升更明显。

如果需要协调异步请求,而不只是发起完全独立的请求,协程较之回调的好处会变得显而易见。下一节说明回调的问题,并给出解决方法。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文