返回介绍

17.5 显示下载进度并处理错误

发布于 2024-02-05 21:59:47 字数 14862 浏览 0 评论 0 收藏 0

前面说过,17.1 节中的几个脚本没有处理错误,这样做是为了便于阅读和比较三种方案(依序、多线程和异步)的结构。

为了处理各种错误,我创建了 flags2 系列示例。

flags2_common.py

这个模块中包含所有 flags2 示例通用的函数和设置,例如 main 函数,负责解析命令行参数、计时和报告结果。这个脚本中的代码其实是提供支持的,与本章的话题没有直接关系,因此我把源码放在附录 A 里的示例 A-10 中。

flags2_sequential.py

能正确处理错误,以及显示进度条的 HTTP 依序下载客户端。flags2_threadpool.py 脚本会用到这个模块里的 download_one 函数。

flags2_threadpool.py

基于 futures.ThreadPoolExecutor 类实现的 HTTP 并发客户端,演示如何处理错误,以及集成进度条。

flags2_asyncio.py

与前一个脚本的作用相同,不过使用 asyncio 和 aiohttp 实现。这个脚本在第 18 章的 18.4 节中分析。

 测试并发客户端时要小心

在公开的 HTTP 服务器上测试 HTTP 并发客户端时要小心,因为每秒可能会发起很多请求,这相当于是拒绝服务(DoS)攻击。我们不想攻击任何人,只是在学习如何开发高性能的客户端。访问公开的服务器时一定要管好自己的客户端。做高并发试验时,应该在本地架设 HTTP 服务器供测试。本书代码仓库中的 17-futures/countries/ 目录里有个 README.rst 文件,那里有架设说明。

flags2 系列示例最明显的特色是,有使用 TQDM 包实现的文本动画进度条。我在 YouTube 上发布了一个 108 秒的视频,展示了这个进度条,还对比了三个 flags 脚本的下载速度。在那个视频中,我先运行依序下载的脚本,不过 32 秒后中断了,因为那个脚本要用 5 分多钟访问 676 个 URL,下载 194 面国旗;然后,我分别运行多线程和 asyncio 版三次,每次都在 6 秒之内(即快了 60 多倍)完成任务。图 17-1 中有两个截图,分别是 flags2_threadpool.py 脚本运行中和运行结束后。

图 17-1:(左上)flags2_threadpool.py 运行中,显示着 tqdm 包生成的进度条;(右下)同一个终端窗口,脚本运行完毕后

TQDM 包特别易于使用,项目的 README.md 文件中有个 GIF 动画,演示了最简单的用法。安装 tqdm 包之后,8 在 Python 控制台中输入下述代码,会在注释那里看到进度条动画:

8可以使用 pip install tqdm 命令安装 tqdm 包。——编者注

>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(1000)):
...   time.sleep(.01)
...
>>> # -> 进度条会出现在这里 <-

除了这个灵巧的效果之外,tqdm 函数的实现方式也很有趣:能处理任何可迭代的对象,生成一个迭代器;使用这个迭代器时,显示进度条和完成全部迭代预计的剩余时间。为了计算预计剩余时间,tqdm 函数要获取一个能使用 len 函数确定大小的可迭代对象,或者在第二个参数中指定预期的元素数量。借助在 flags2 系列示例中集成 TQDM,我们可以深入了解这几个脚本的运作方式,因为我们必须使用 futures.as_completed 函数asyncio.as_completed 函数,这样 tqdm 函数才能在每个期物运行结束后更新进度。

flags2 系列示例的另一个特色是,提供了命令行接口。三个脚本接受的选项相同,运行任意一个脚本时指定 -h 选项就能看到所有选项。示例 17-8 显示的是帮助文本。

示例 17-8 flags2 系列脚本的帮助界面

$ python3 flags2_threadpool.py -h
usage: flags2_threadpool.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
              [-v]
              [CC [CC ...]]

Download flags for country codes. Default: top 20 countries by population.

positional arguments:
  CC          country code or 1st letter (eg. B for BA...BZ)

optional arguments:
  -h, --help      show this help message and exit
  -a, --all       get all available flags (AD to ZW)
  -e, --every       get flags for every possible code (AA...ZZ)
  -l N, --limit N     limit to N first codes
  -m CONCURRENT, --max_req CONCURRENT
            maximum concurrent requests (default=30)
  -s LABEL, --server LABEL
            Server to hit; one of DELAY, ERROR, LOCAL, REMOTE
            (default=LOCAL)
  -v, --verbose     output detailed progress info

所有选项都是可选的。下面说明最重要的选项。

不能忽略的选项是 -s/--server:用于选择测试时使用的 HTTP 服务器和基 URL。这个选项的值可以设为下述 4 个字符串(不区分大小写),用于确定脚本从哪里下载国旗。

LOCAL

使用 http://localhost:8001/flags;这是默认值。你应该配置一个本地 HTTP 服务器,响应 8001 端口的请求。我测试时使用 Nginx。本章示例代码中的 README.rst 文件说明了如何安装和配置 Nginx。

REMOTE

使用 http://flupy.org/data/flags;这是我搭建的公开网站,托管在一个共享服务器中。请不要使用太多并发请求访问这个网站。flupy.org 域名由 Cloudflare CDN 的一个免费账户管理,因此第一次下载时会发现很慢,不过一旦 CDN 有了缓存,速度就会变快。9

9测试这些脚本时,我向那个廉价的虚拟主机发起了一些并发请求,但是得到的响应是“HTTP 503 errors—Service Temporarily Unavailable”。后来我配置了 Cloudflare,现在没有这个错误了。

DELAY

使用 http://localhost:8002/flags;这是一个代理,会延迟 HTTP 响应,监听的端口是 8002。我在本地的 Nginx 服务器前加上了 Mozilla Vaurien,以此引入延迟。前面提到的那个 README.rst 文件中有运行 Vaurien 代理的说明。

ERROR

使用 http://localhost:8003/flags;这是一个代理,监听 8003 端口,引入了 HTTP 错误,并延迟响应。这个服务器使用的 Vaurien 配置与前面不同。

 仅当在本地架设 HTTP 服务器,并且监听 8001 端口时,才能使用 LOCAL 选项。DELAY 和 ERROR 选项需要代理,分别监听 8002 和 8003 端口。在 GitHub 上本书的代码仓库中有个 17-futures/countries/README.rst 文件,说明了如何配置 Nginx 和 Mozilla Vaurien,以实现这些选项的要求。

默认情况下,各个 flags2 脚本会使用默认的并发连接数(各脚本有所不同)从 LOCAL 服务器中下载人口最多的 20 个国家的国旗。示例 17-9 是全部使用默认值运行 flags2_sequential.py 脚本得到的输出。

示例 17-9 全部使用默认值运行 flags2_sequential.py 脚本:LOCAL 服务器,人口最多的 20 国国旗,1 个并发连接

$ python3 flags2_sequential.py
LOCAL site: http://localhost:8001/flags
Searching for 20 flags: from BD to VN
1 concurrent connection will be used.
--------------------
20 flags downloaded.
Elapsed time: 0.10s

我们可以使用多种不同的方式选择下载哪些国家的国旗。示例 17-10 展示如何下载国家代码以字母 A、B 或 C 开头的所有国旗。

示例 17-10 运行 flags2_threadpool.py 脚本,从 DELAY 服务器中下载国家代码以 A、B 或 C 开头的所有国旗

$ python3 flags2_threadpool.py -s DELAY a b c
DELAY site: http://localhost:8002/flags
Searching for 78 flags: from AA to CZ
30 concurrent connections will be used.
--------------------
43 flags downloaded.
35 not found.
Elapsed time: 1.72s

不管使用什么方式选择国家代码,下载的国旗数量都可以使用 -l/--limit 选项限制。示例 17-11 演示如何发起 100 个请求,结合 -a 和 -l 选项下载 100 面国旗。

示例 17-11 运行 flags2_asyncio.py 脚本,使用 100 个并发请求(-m 100)从 ERROR 服务器中下载100 面国旗(-al 100)

$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100
ERROR site: http://localhost:8003/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
--------------------
73 flags downloaded.
27 errors.
Elapsed time: 0.64s

以上是 flags2 系列示例的用户界面。下面分析实现方式。

17.5.1 flags2系列示例处理错误的方式

这三个示例在负责下载一个文件的函数(download_one)中使用相同的策略处理 HTTP 404 错误(未找到)。其他异常则向上冒泡,交给 download_many 函数处理。

我们还是先分析依序下载的代码,因为这些代码更易于理解,而且使用线程池的脚本重用了这里的大部分代码。示例 17-12 列出的是 flags2_sequential.py 和 flags2_threadpool.py 脚本真正用于下载的函数。

示例 17-12 flags2_sequential.py:负责下载的基本函数;flags2_threadpool.py 脚本重用了这两个函数

def get_flag(base_url, cc):
  url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
  resp = requests.get(url)
  if resp.status_code != 200:  ➊
    resp.raise_for_status()
  return resp.content


def download_one(cc, base_url, verbose=False):
  try:
    image = get_flag(base_url, cc)
  except requests.exceptions.HTTPError as exc:  ➋
    res = exc.response
    if res.status_code == 404:
       status = HTTPStatus.not_found  ➌
       msg = 'not found'
    else:  ➍
      raise
  else:
    save_flag(image, cc.lower() + '.gif')
    status = HTTPStatus.ok
    msg = 'OK'

  if verbose:  ➎
    print(cc, msg)

  return Result(status, cc)  ➏

❶ get_flag 函数没有处理错误,当 HTTP 代码不是 200 时,使用 requests.Response.raise_for_status 方法抛出异常。10

10HTTP 代码 200 表示成功完成 HTTP 请求。——编者注

❷ download_one 函数捕获 requests.exceptions.HTTPError 异常,特别处理 HTTP 404 错误……

❸ ……方法是,把局部变量 status 设为 HTTPStatus.not_found;HTTPStatus 是从 flags2_common 模块(见示例 A-10)中导入的 Enum 对象。

❹ 重新抛出其他 HTTPError 异常;这些异常会向上冒泡,传给调用方。

❺ 如果在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息。

➏ download_one 函数的返回值是一个 namedtuple——Result,其中有个 status 字段,其值是 HTTPStatus.not_found 或 HTTPStatus.ok。

示例 17-13 列出的是 download_many 函数的依序下载版。代码虽然简单,不过值得分析一下,以便后面与并发版对比。我们要关注的是报告进度、处理错误和统计下载数量的方式。

示例 17-13 flags2_sequential.py:实现依序下载的 download_many 函数

def download_many(cc_list, base_url, verbose, max_req):
  counter = collections.Counter()  ➊
  cc_iter = sorted(cc_list)  ➋
  if not verbose:
    cc_iter = tqdm.tqdm(cc_iter)  ➌
  for cc in cc_iter:  ➍
    try:
      res = download_one(cc, base_url, verbose)  ➎
    except requests.exceptions.HTTPError as exc:  ➏
      error_msg = 'HTTP error {res.status_code} - {res.reason}'
      error_msg = error_msg.format(res=exc.response)
    except requests.exceptions.ConnectionError as exc:  ➐
      error_msg = 'Connection error'
    else:  ➑
      error_msg = ''
      status = res.status

    if error_msg:
      status = HTTPStatus.error  ➒
    counter[status] += 1  ➓
    if verbose and error_msg:  ⓫
      print('*** Error for {}: {}'.format(cc, error_msg))

  return counter  ⓬

❶ 这个 Counter 实例用于统计不同的下载状态:HTTPStatus.ok、HTTPStatus.not_found 或 HTTPStatus.error。

❷ 按字母顺序传入的国家代码列表,保存在 cc_iter 变量中。

❸ 如果不是详细模式,把 cc_iter 传给 tqdm 函数,返回一个迭代器,产出 cc_iter 中的元素,还会显示进度条动画。

❹ 这个 for 循环迭代 cc_iter……

❺ ……不断调用 download_one 函数,执行下载。

❻ 处理 get_flag 函数抛出的与 HTTP 有关的且 download_one 函数没有处理的异常。

❼ 处理其他与网络有关的异常。其他异常会中止这个脚本,因为调用 download_many 函数的 flags2_common.main 函数中没有 try/except 块。

❽ 如果没有异常从 download_one 函数中逃出,从 download_one 函数返回的 namedtuple(HTTPStatus)中获取 status。

❾ 如果有错误,把局部变量 status 设为相应的状态。

❿ 以 HTTPStatus(一个 Enum)中的值为键,增加计数器。

⓫ 如果是详细模式,而且有错误,显示带有当前国家代码的错误消息。

⓬ 返回 counter,以便 main 函数能在最终的报告中显示数量。

下面分析重构后的线程池示例——flags2_threadpool.py。

17.5.2 使用futures.as_completed函数

为了集成 TQDM 进度条,并处理各次请求中的错误,flags2_threadpool.py 脚本用到我们见过的 futures.ThreadPoolExecutor 类和 futures.as_completed 函数。示例 17-14 是 flags2_threadpool.py 脚本的完整代码清单。这个脚本只实现了 download_many 函数,其他函数都重用 flags2_common 和 flags2_sequential 模块里的。

示例 17-14 flags2_threadpool.py:完整的代码清单

import collections
from concurrent import futures

import requests
import tqdm  ➊

from flags2_common import main, HTTPStatus  ➋
from flags2_sequential import download_one  ➌

DEFAULT_CONCUR_REQ = 30  ➍
MAX_CONCUR_REQ = 1000  ➎


def download_many(cc_list, base_url, verbose, concur_req):
  counter = collections.Counter()
  with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:  ➏
    to_do_map = {}  ➐
    for cc in sorted(cc_list):  ➑
      future = executor.submit(download_one,
              cc, base_url, verbose)  ➒
      to_do_map[future] = cc  ➓
    done_iter = futures.as_completed(to_do_map)  ⓫
    if not verbose:
      done_iter = tqdm.tqdm(done_iter, total=len(cc_list))  ⓬
    for future in done_iter:  ⓭
      try:
        res = future.result()  ⓮
      except requests.exceptions.HTTPError as exc:  ⓯
        error_msg = 'HTTP {res.status_code} - {res.reason}'
        error_msg = error_msg.format(res=exc.response)
      except requests.exceptions.ConnectionError as exc:
        error_msg = 'Connection error'
      else:
        error_msg = ''
        status = res.status

      if error_msg:
        status = HTTPStatus.error
      counter[status] += 1
      if verbose and error_msg:
        cc = to_do_map[future]  ⓰
        print('*** Error for {}: {}'.format(cc, error_msg))
  return counter


if __name__ == '__main__':
  main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

❶ 导入显示进度条的库。

❷ 从 flags2_common 模块中导入一个函数和一个 Enum。

❸ 重用 flags2_sequential 模块(见示例 17-12)里的 download_one 函数。

❹ 如果没有在命令行中指定 -m/--max_req 选项,使用这个值作为并发请求数的最大值,也就是线程池的大小;真实的数量可能会比这少,例如下载的国旗数量较少。

❺ 不管要下载多少国旗,也不管 -m/--max_req 命令行选项的值是多少,MAX_CONCUR_REQ 会限制最大的并发请求数;这是一项安全预防措施。

❻ 把 max_workers 设为 concur_req,创建 ThreadPoolExecutor 实例;main 函数会把下面这三个值中最小的那个赋值给 concur_req:MAX_CONCUR_REQ、cc_list 的长度、-m/--max_req 命令行选项的值。这样能避免创建超过所需的线程。

❼ 这个字典把各个 Future 实例(表示一次下载)映射到相应的国家代码上,在处理错误时使用。

❽ 按字母顺序迭代国家代码列表。结果的顺序主要由 HTTP 响应的时间长短决定,不过,如果线程池的大小(由 concur_req 设定)比 len(cc_list) 小得多,可能会发现有按字母顺序批量下载的情况。

❾ 每次调用 executor.submit 方法排定一个可调用对象的执行时间,然后返回一个 Future 实例。第一个参数是可调用的对象,其余的参数是传给可调用对象的参数。

❿ 把返回的 future 和国家代码存储在字典中。

⓫ futures.as_completed 函数返回一个迭代器,在期物运行结束后产出期物。

⓬ 如果不是详细模式,把 as_completed 函数返回的结果传给 tqdm 函数,显示进度条;因为 done_iter 没有 len 函数,所以我们必须通过 total= 参数告诉 tqdm 函数预期的元素数量,这样 tqdm 才能预计剩余的工作量。

⓭ 迭代运行结束后的期物。

⓮ 在期物上调用 result 方法,要么返回可调用对象的返回值,要么抛出可调用的对象在执行过程中捕获的异常。这个方法可能会阻塞,等待确定结果;不过,在这个示例中不会阻塞,因为 as_completed 函数只返回已经运行结束的期物。

⓯ 处理可能出现的异常;这个函数余下的代码与依序下载版 download_many 函数一样(见示例 17-13),不过下一点除外。

⓰ 为了给错误消息提供上下文,以当前的 future 为键,从 to_do_map 中获取国家代码。在依序下载版中无须这么做,因为那一版迭代的是国家代码,所以知道当前国家的代码;而这里迭代的是期物。

示例 17-14 用到了一个对 futures.as_completed 函数特别有用的惯用法:构建一个字典,把各个期物映射到其他数据(期物运行结束后可能有用)上。这里,在 to_do_map 中,我们把各个期物映射到对应的国家代码上。这样,尽管期物生成的结果顺序已经乱了,依然便于使用结果做后续处理。

Python 线程特别适合 I/O 密集型应用,concurrent.futures 模块大大简化了某些使用场景下 Python 线程的用法。我们对 concurrent.futures 模块基本用法的介绍到此结束。下面讨论不适合使用 ThreadPoolExecutor 或 ProcessPoolExecutor 类时,有哪些替代方案。

17.5.3 线程和多进程的替代方案

Python 自 0.9.8 版(1993 年)就支持线程了,concurrent.futures 只不过是使用线程的最新方式。Python 3 废弃了原来的 thread 模块,换成了高级的 threading 模块11 如果 futures.ThreadPoolExecutor 类对某个作业来说不够灵活,可能要使用 threading 模块中的组件(如 Thread、Lock、Semaphore 等)自行制定方案,比如说使用 queue 模块创建线程安全的队列,在线程之间传递数据。futures.ThreadPoolExecutor 类已经封装了这些组件。

11threading 模块自 Python 1.5.1(1998 年)就已存在,不过有些人仍然继续使用旧的 thread 模块。Python 3 把 thread 模块重命名为 _thread,以此强调这是低层实现,不应该在应用代码中使用。

对 CPU 密集型工作来说,要启动多个进程,规避 GIL。创建多个进程最简单的方式是,使用 futures.ProcessPoolExecutor 类。不过和前面一样,如果使用场景较复杂,需要更高级的工具。multiprocessing 模块的 API 与 threading 模块相仿,不过作业交给多个进程处理。对简单的程序来说,可以用 multiprocessing 模块代替 threading 模块,少量改动即可。不过,multiprocessing 模块还能解决协作进程遇到的最大挑战:在进程之间传递数据。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文