- 内容提要
- 前言
- 作者简介
- 封面简介
- 第1章 理解高性能 Python
- 第2章 通过性能分析找到瓶颈
- 2.1 高效地分析性能
- 2.2 Julia 集合的介绍
- 2.3 计算完整的 Julia 集合
- 2.4 计时的简单方法——打印和修饰
- 2.5 用 UNIX 的 time 命令进行简单的计时
- 2.6 使用 cProfile 模块
- 2.7 用 runsnakerun 对 cProfile 的输出进行可视化
- 2.8 用 line_profiler 进行逐行分析
- 2.9 用 memory_profiler 诊断内存的用量
- 2.10 用 heapy 调查堆上的对象
- 2.11 用 dowser 实时画出变量的实例
- 2.12 用 dis 模块检查 CPython 字节码
- 2.13 在优化期间进行单元测试保持代码的正确性
- 2.14 确保性能分析成功的策略
- 2.15 小结
- 第3章 列表和元组
- 第4章 字典和集合
- 第5章 迭代器和生成器
- 第6章 矩阵和矢量计算
- 第7章 编译成 C
- 第8章 并发
- 第9章 multiprocessing 模块
- 第10章 集群和工作队列
- 第11章 使用更少的 RAM
- 第12章 现场教训
8.6 数据库的例子
为了让前面的例子更具体,我们还制造了另一个玩具型的问题,主要是CPU密集型的但是包含了潜在的限制I/O的组件。我们将要计算素数,并把发现的素数存入一个数据库中。数据可以是任意的,问题则具有代表性,代表了你的程序有着要做的任何类型的重量级计算,那些计算的结果必须得存入一个数据库中,偷偷地招来重量级的I/O惩罚。我们对数据库施加的限制只有:
有HTTP的API,这样我们就能够使用像前面的例子中那样的代码[5]。
响应时间在50毫秒的级别。
数据库能够满足同一时刻处理多个请求[6]。
我们从一些简单的代码开始,计算素数并且每当发现了一个素数,就向数据库的HTTP API发起请求:
from tornado.httpclient import HTTPClient import math httpclient = HTTPClient() def save_prime_serial(prime): url = "http://127.0.0.1:8080/add?prime={}".format(prime) response = httpclient.fetch(url) finish_save_prime(response, prime) def finish_save_prime(response, prime): if response.code != 200: print "Error saving prime: {}".format(prime) def check_prime(number): if number % 2 == 0: return False for i in xrange(3, int(math.sqrt(number)) + 1, 2): if number % i == 0: return False return True def calculate_primes_serial(max_number): for number in xrange(max_number): if check_prime(number): save_prime_serial(number) return
正如我们在串行例子(例8-3)中的那样,每次数据库存储的请求时间(50毫秒)没有堆积,并且我们必须为我们所发现的每一个素数付出这个代价。结果就是,搜索到max_number = 8192(产生了1028个素数)花费了55.2秒。我们知道,无论怎样,正是因为我们的串行请求工作方式,我们至少花费51.4秒来做I/O! 所以,只是因为我们正在做I/O时暂停了程序,我们浪费了93%的时间。
作为替代,我们想做的事情就是找到改变我们请求模式的方式,这样我们就能同时异步地发出很多请求,我们就不需要如此难以承担的I/O等待。为了做到,我们创建了一个AsyncBatcher类来为我们处理批量的请求,并在需要时发出请求:
import grequests from itertools import izip class AsyncBatcher(object): __slots__ = ["batch", "batch_size", "save", "flush"] def __init__(self, batch_size): self.batch_size = batch_size self.batch = [] def save(self, prime): url = "http://127.0.0.1:8080/add?prime={}".format(prime) self.batch.append((url,prime)) if len(self.batch) == self.batch_size: self.flush() def flush(self): responses_futures = (grequests.get(url) for url, _ in self.batch) responses = grequests.map(responses_futures) for response, (url, prime) in izip(responses, self.batch): finish_save_prime(response, prime) self.batch = []
现在,我们能够以与我们之前所做的同样的方式前进。主要区别仅仅是我们给AsyncBatcher添加了我们的新素数,并让它来处理什么时候发送请求。此外,既然我们正在批量处理,我们必须确保发送最后那批,即使它还未满(意味着调用AsyncBatcher.flush())。
def calculate_primes_async(max_number): batcher = AsyncBatcher(100) # ❶ for number in xrange(max_number): if check_prime(number): batcher.save(number) batcher.flush() return
❶ 我们选择以100个请求为批次,原因与图8-3中所示的那样类似。
随着这个改变,我们能够把计算到max_number = 8192的运行时间降低到4.09秒。这就代表了13.5倍的速度提升,而没有做很多的工作。在一个类似实时数据处理的约束环境下,这种额外的速度就可能意味着区分一个系统是能跟上需求还是落后于需求(在这种情况下,需要有一个队列,你会在第10章学到这些内容)。
在图8-7中,我们能看到这些变化在不同的工作负荷中影响代码的运行时间。异步代码相对串行代码的速度提升是显著的,尽管我们还不是在原始的CPU问题上取得的提速。为了完全改进这个问题,我们需要使用像multiprocess之类的模块来以一个完全独立的进程来处理问题中的I/O负担部分,而不会去拖慢问题中的CPU运算部分。
图8-7 不同数量的素数处理时间
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论