返回介绍

8.6 数据库的例子

发布于 2024-01-25 21:44:08 字数 2967 浏览 0 评论 0 收藏 0

为了让前面的例子更具体,我们还制造了另一个玩具型的问题,主要是CPU密集型的但是包含了潜在的限制I/O的组件。我们将要计算素数,并把发现的素数存入一个数据库中。数据可以是任意的,问题则具有代表性,代表了你的程序有着要做的任何类型的重量级计算,那些计算的结果必须得存入一个数据库中,偷偷地招来重量级的I/O惩罚。我们对数据库施加的限制只有:

有HTTP的API,这样我们就能够使用像前面的例子中那样的代码[5]

响应时间在50毫秒的级别。

数据库能够满足同一时刻处理多个请求[6]

我们从一些简单的代码开始,计算素数并且每当发现了一个素数,就向数据库的HTTP API发起请求:

from tornado.httpclient import HTTPClient import math httpclient = HTTPClient() def save_prime_serial(prime): url = "http://127.0.0.1:8080/add?prime={}".format(prime) response = httpclient.fetch(url) finish_save_prime(response, prime) def finish_save_prime(response, prime): if response.code != 200: print "Error saving prime: {}".format(prime) def check_prime(number): if number % 2 == 0: return False for i in xrange(3, int(math.sqrt(number)) + 1, 2): if number % i == 0: return False return True def calculate_primes_serial(max_number): for number in xrange(max_number): if check_prime(number): save_prime_serial(number) return

正如我们在串行例子(例8-3)中的那样,每次数据库存储的请求时间(50毫秒)没有堆积,并且我们必须为我们所发现的每一个素数付出这个代价。结果就是,搜索到max_number = 8192(产生了1028个素数)花费了55.2秒。我们知道,无论怎样,正是因为我们的串行请求工作方式,我们至少花费51.4秒来做I/O! 所以,只是因为我们正在做I/O时暂停了程序,我们浪费了93%的时间。

作为替代,我们想做的事情就是找到改变我们请求模式的方式,这样我们就能同时异步地发出很多请求,我们就不需要如此难以承担的I/O等待。为了做到,我们创建了一个AsyncBatcher类来为我们处理批量的请求,并在需要时发出请求:

import grequests
from itertools import izip

class AsyncBatcher(object):
  __slots__ = ["batch", "batch_size", "save", "flush"]
  def __init__(self, batch_size):
    self.batch_size = batch_size
    self.batch = []

  def save(self, prime):
    url = "http://127.0.0.1:8080/add?prime={}".format(prime)
    self.batch.append((url,prime))
    if len(self.batch) == self.batch_size:
      self.flush()

  def flush(self):
    responses_futures = (grequests.get(url) for url, _ in self.batch)
    responses = grequests.map(responses_futures)
    for response, (url, prime) in izip(responses, self.batch):
      finish_save_prime(response, prime)
    self.batch = []

现在,我们能够以与我们之前所做的同样的方式前进。主要区别仅仅是我们给AsyncBatcher添加了我们的新素数,并让它来处理什么时候发送请求。此外,既然我们正在批量处理,我们必须确保发送最后那批,即使它还未满(意味着调用AsyncBatcher.flush())。

def calculate_primes_async(max_number):
  batcher = AsyncBatcher(100) # ❶
  for number in xrange(max_number):
    if check_prime(number):
      batcher.save(number)
  batcher.flush()
  return

❶ 我们选择以100个请求为批次,原因与图8-3中所示的那样类似。

随着这个改变,我们能够把计算到max_number = 8192的运行时间降低到4.09秒。这就代表了13.5倍的速度提升,而没有做很多的工作。在一个类似实时数据处理的约束环境下,这种额外的速度就可能意味着区分一个系统是能跟上需求还是落后于需求(在这种情况下,需要有一个队列,你会在第10章学到这些内容)。

在图8-7中,我们能看到这些变化在不同的工作负荷中影响代码的运行时间。异步代码相对串行代码的速度提升是显著的,尽管我们还不是在原始的CPU问题上取得的提速。为了完全改进这个问题,我们需要使用像multiprocess之类的模块来以一个完全独立的进程来处理问题中的I/O负担部分,而不会去拖慢问题中的CPU运算部分。

图8-7 不同数量的素数处理时间

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文