返回介绍

8.5 示例2:测量吞吐量和延时的扩展

发布于 2024-01-30 22:48:37 字数 4687 浏览 0 评论 0 收藏 0

当我们在第9章中添加管道后,测量吞吐量(每秒的item数)和延时(计划后和下载后的时间)的变化是一件很有意思的事情。

Scrapy扩展中已经包含了一个测量吞吐量的扩展,即日志统计扩展(scrapy/extensions/logstats.py),我们将会以此为起点。要想测量延时,需要hook一些信号,包括request_scheduled、response_received和item_scraped。我们对每个信号记录时间戳,并通过累计多次取平均值的方式减去适当的计算延时。通过观察这些信号提供的回调参数,会发现一些讨厌的东西。item_scraped只在Response中获得,request_scheduled只在Request中获得,而response_received则是两者中都有。幸运的是,我们不需要任何特殊的技巧来传递这些值。每个Response都有一个Request成员,回指其Request,更好的是它拥有我们在第5章中看到的meta字典,它和原始Request中的一样,而不管是否存在重定向。非常好,我们可以在这里存储时间戳了!

实际上,这并不是我的主意。同样的机制已经在AutoThrottle扩展(scrapy/extensions/throttle.py)中使用了。在该扩展中,使用了request.meta.get('download_latency'),其中download_latency是在scrapy/core/downloader/webclient.py下载器中进行计算的。在编写中间件时,最快的改善方式就是让自己熟悉Scrapy默认的中间件代码。

下面是扩展的代码。

class Latencies(object):
  @classmethod
  def from_crawler(cls, crawler):
   return cls(crawler)

  def __init__(self, crawler):
   self.crawler = crawler
   self.interval = crawler.settings.getfloat('LATENCIES_INTERVAL')
   if not self.interval:
    raise NotConfigured
   cs = crawler.signals
   cs.connect(self._spider_opened, signal=signals.spider_opened)
   cs.connect(self._spider_closed, signal=signals.spider_closed)
   cs.connect(self._request_scheduled, signal=signals.request_scheduled)
   cs.connect(self._response_received, signal=signals.response_received)
   cs.connect(self._item_scraped, signal=signals.item_scraped)
   self.latency, self.proc_latency, self.items = 0, 0, 0

  def _spider_opened(self, spider):
   self.task = task.LoopingCall(self._log, spider)
   self.task.start(self.interval)

  def _spider_closed(self, spider, reason):
   if self.task.running:
     self.task.stop()

  def _request_scheduled(self, request, spider):
   request.meta['schedule_time'] = time()
  def _response_received(self, response, request, spider):
   request.meta['received_time'] = time()
  def _item_scraped(self, item, response, spider):
   self.latency += time() - response.meta['schedule_time']
   self.proc_latency += time() - response.meta['received_time']
   self.items += 1
  def _log(self, spider):
   irate = float(self.items) / self.interval
   latency = self.latency / self.items if self.items else 0
   proc_latency = self.proc_latency / self.items if self.items else 0
   spider.logger.info(("Scraped %d items at %.1f items/s, avg latency: "
    "%.2f s and avg time in pipelines: %.2f s") %
    (self.items, irate, latency, proc_latency))
   self.latency, self.proc_latency, self.items = 0, 0, 0

前两个方法非常重要,因为它们很通用。它们使用Crawler对象初始化中间件。你会发现这些代码几乎出现在每个重要的中间件当中。from_crawler(cls, crawler)是获取Crawler对象的方式。然后,可以注意到在__init__()方法中,访问了crawler.settings,并且会在其未设置时抛出NotConfigured异常。你会看到很多FooBar扩展,用于检查相应的FOOBAR_ENABLED设置,如果没有设置或者设置为False时,将会抛出异常。这是一种非常常见的模式,是为了方便将中间件包含在对应的settings.py设置中(比如ITEM_PIPELINES),但是默认情况下是禁用的,除非通过其对应的设置显式启用。许多默认的Scrapy中间件(比如AutoThrottle或HttpCache)都使用了这种模式。在本例中,我们的扩展会保持LATENCIES_INTERVAL的禁用状态,除非已经对其进行了设置。

在__init__()方法的后面一部分代码中,我们使用crawler.signals.connect(),为所有感兴趣的信号都注册了回调,并初始化了一些成员变量。这个类的剩余部分实现了信号处理器。在_spider_opened()中,我们初始化了一个计时器,会每隔LATENCIES_INTERVAL秒调用_log()方法;在_spider_closed()中,我们停止了该计时器。在_request_scheduled()和_response_received()中,我们在request.meta中存储了时间戳;而在_item_scraped()中,我们累计两次延时(从计划/接收开始直到当前时间),并递增抓取到的Item的数量。在_log()方法中,我们计算了平均值,格式化并打印出消息,重置累加器以开始另一个采样周期。

任何在多线程上下文中编写类似代码的人,都会意识到上述代码中没有使用互斥锁。本例可能还不是特别复杂,不过编写单线程代码仍然要更加简单,并且在更加复杂的场景下可以很好地扩展。

我们可以将该扩展的代码添加到latencies.py模块中,放到和settings.py同级的目录下。如果想要启用该扩展,只需在settings.py文件中添加如下两行。

EXTENSIONS = { 'properties.latencies.Latencies': 500, }
LATENCIES_INTERVAL = 5

我们可以像平时那样运行它。

$ pwd
/root/book/ch08/properties
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000 -s LOG_LEVEL=INFO
...
INFO: Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
INFO: Scraped 0 items at 0.0 items/sec, average latency: 0.00 sec and 
average time in pipelines: 0.00 sec
INFO: Scraped 115 items at 23.0 items/s, avg latency: 0.84 s and avg time 
in pipelines: 0.12 s
INFO: Scraped 125 items at 25.0 items/s, avg latency: 0.78 s and avg time 
in pipelines: 0.12 s

日志的第一行来自日志统计扩展,而接下来的各行来自我们的扩展。可以看到吞吐量是每秒25个item,平均时延是0.78秒,我们在下载后几乎没有花费时间处理。通过利特尔法则,我们得到系统中item的数量为N = S · T = 43 · 0.45 ≌ 19。无论设置的CONCURRENT_REQUESTS和CONCURRENT_REQUESTS_PER_DOMAIN是多少,即便没有触及100%的CPU,出于某些原因,也不应该使其超过30。我们可以在第10章中了解更多相关内容。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文