返回介绍

11.4 爬虫和中间件的变化

发布于 2024-01-30 22:48:37 字数 11371 浏览 0 评论 0 收藏 0

为了构建该系统,我们需要稍微对Scrapy爬虫进行修改,并且需要开发爬虫中间件。更具体地说,我们必须执行如下操作:

· 调整索引页爬取,以最大速率执行;

· 编写中间件,分批发送URL到Scrapyd服务器;

· 使用相同中间件,允许在启动时使用批量URL。

我们将尝试使用尽可能小的改动来实现这些变化。理想情况下,整个操作应该清晰、易理解并且对其依赖的爬虫代码透明。这应该是一个基础架构层级的需求,如果想对爬虫(可能数百个)进行修改来实现它则是一个坏主意。

11.4.1 索引页分片爬取

我们的第一步是优化索引页爬取,使其尽可能更快。在开始之前,先来设置一些期望。假设爬虫爬取并发量是16,并且我们测量得到其与源网站服务器的延迟大约为0.25秒。此时得到的吞吐量最多为16 / 0.25 = 64页/秒。索引页数量为50000个详情页 / 每个索引页30个详情页链接 = 1667索引页。因此,我们期望索引页下载花费的时间大约为1667 / 64 = 26秒多一点。

让我们以第3章中名为easy的爬虫开始。先把执行垂直抓取的Rule注释掉(callback='parse_item'的那个),因为现在只需要爬取索引页。

你可以在GitHub中获取到本书的全部代码。下载该代码,可以访问:git clone https://github.com/scalingexcellence/scrapybook。

本章中的完整代码位于ch11目录当中。

如果我们在进行任何优化之前对scrapy crawl只爬取10个页面的情况进行计时,可以得到如下结果。

$ ls
properties scrapy.cfg
$ pwd
/root/book/ch11/properties
$ time scrapy crawl easy -s CLOSESPIDER_PAGECOUNT=10
...
DEBUG: Crawled (200) <GET ...index_00000.html> (referer: None)
DEBUG: Crawled (200) <GET ...index_00001.html> (referer: ...index_00000.
html)
...
real 0m4.099s

如果10个页面就花费了4秒时间,那么就不可能在26秒时间内完成1,700个页面。通过查看日志,我们发现每个页面都来自于前一个页面的下一页链接,也就是说在任意时刻都只有至多一个页面正在执行爬取。我们的有效并发为1。我们希望并行处理,得到想要的并发数量(16个并发请求)。我们将对索引分片,并允许一些额外的分片,以确保爬虫中的URL不会不足。我们将会把索引分为20个段。实际上,任何超过16的数值都能够增加速度,不过在超过20之后所得到的回报呈递减趋势。我们将通过如下表达式计算每个分片的起始索引ID。

>>> map(lambda x: 1667 * x / 20, range(20))
[0, 83, 166, 250, 333, 416, 500, ... 1166, 1250, 1333, 1416, 1500, 1583]

因此,我们使用如下代码设置start_urls。

start_urls = ['http://web:9312/properties/index_%05d.html' % id
       for id in map(lambda x: 1667 * x / 20, range(20))]

这可能和你的索引有很大的不同,因此我们没必要在此处实现得更漂亮。如果还设定了并发设置(CONCURRENT_REQUESTS、CONCURRENT_REQUESTS_PER_DOMAIN)为16,那么当运行爬虫时,将会得到如下结果。

$ time scrapy crawl easy -s CONCURRENT_REQUESTS=16 -s CONCURRENT_
REQUESTS_PER_DOMAIN=16
...
real 0m32.344s

该结果已经与期望值非常接近了。我们的下载速度为 1667个页面 / 32秒 = 52个索引页/秒,这就意味着每秒可以生成52×30 = 1560个详情页URL。现在,可以将垂直抓取的Rule的注释取消掉,保存文件作为新爬虫分发。我们不需要对爬虫代码进行更多修改,这显示出我们即将开发的中间件的强大以及非侵入性。如果只使用开发服务器运行scrapy crawl,假设处理详情页的速度和索引页处理时一样快,那么它将花费不少于50000 / 52 = 16分钟时间完成爬取。

本节有两个关键内容。在学习完第10章之后,我们已经可以实现真正的工程。我们能够精确计算出系统期望得到的性能,并且确保在达到该性能之前不会停止(在合理范围内)。第二个要记住的重要事情是,由于索引页爬取提供了详情页,爬取的总吞吐量将会是其吞吐量的最小值。如果我们生成的URL比Scrapyd能够消费得更快,那么URL将会堆积在其队列当中。反过来,如果生成的URL太慢,Scrapyd将会拥有过剩的无法利用的能力。

11.4.2 分批爬取URL

现在,我们准备开发处理详情页URL的基础架构,目的是对其进行垂直爬取、分批并分发到多台Scrapyd节点中,而不是在本地爬取。

如果查看第8章中的Scrapy架构,就可以很容易地得出结论,这是爬虫中间件的任务,因为它实现了process_spider_output(),在到达下载器之前,在此处处理请求,并能够中止它们。我们在实现中限制只支持基于CrawlSpider的爬虫,另外还只支持简单的GET请求。如果需要更加复杂,比如POST或有权限验证的请求,那么需要开发更复杂的功能来扩展参数、请求头,甚至可能在每次批量运行后重新登录。

在开始之前,先来快速浏览一下Scrapy的GitHub。我们将回顾SPIDER_MIDDLEWARES_BASE设置,以查看Scrapy提供的参考实现,以便尽最大可能复用它。Scrapy 1.0包含如下爬虫中间件:HttpErrorMiddleware、OffsiteMiddleware、RefererMiddleware、UrlLengthMiddleware以及DepthMiddleware。在快速了解它们的实现之后,我们发现OffsiteMiddleware(只有60行代码)与想要实现的功能很相似。它根据爬虫的allowed_domains属性,把URL限制在某些特定域名中。我们可以使用相似的模式吗?和OffsiteMiddleware实现中丢弃URL不同,我们将对这些URL进行分批并发送到Scrapyd节点中。事实证明这是可以的。下面是实现的部分代码。

def __init__(self, crawler):
  settings = crawler.settings
  self._target = settings.getint('DISTRIBUTED_TARGET_RULE', -1)
  self._seen = set()
  self._urls = []
  self._batch_size = settings.getint('DISTRIBUTED_BATCH_SIZE', 1000)
  ...

def process_spider_output(self, response, result, spider):
  for x in result:
    if not isinstance(x, Request):
      yield x
    else:
      rule = x.meta.get('rule')

      if rule == self._target:
        self._add_to_batch(spider, x)
      else:
        yield x

def _add_to_batch(self, spider, request):
  url = request.url
  if not url in self._seen:
    self._seen.add(url)
    self._urls.append(url)
    if len(self._urls) >= self._batch_size:
      self._flush_urls(spider)

process_spider_output()既处理Item也处理Request。我们只想处理Request,因此我们对其他所有内容执行yield操作。如果查看CrawlSpider的源代码,就会注意到将Request / Response映射到Rule的方式是通过其meta字典的名为'rule'的整型字段。我们检查该数值,如果它指向目标的Rule(DISTRIBUTED_TARGET_RULE设置),则会调用_add_to_batch()添加URL到当前批次。然后,丢弃该Request。对其他所有Request执行yield操作,比如下一页链接、无变化的链接。_add_to_batch()方法实现了一个去重机制。不过很遗憾的是,由于前一节中描述的分片流程,我们可能对少数URL抽取两次。我们使用_seen集合检测并丢弃重复值。然后,把这些URL添加到_urls列表中,如果其大小超过_batch_size(DISTRIBUTED_BATCH_SIZE设置),就会触发调用_flush_urls()。该方法提供了如下的关键功能。

def __init__(self, crawler):
  ...
  self._targets = settings.get("DISTRIBUTED_TARGET_HOSTS")
  self._batch = 1
  self._project = settings.get('BOT_NAME')
  self._feed_uri = settings.get('DISTRIBUTED_TARGET_FEED_URL', None)
  self._scrapyd_submits_to_wait = []

def _flush_urls(self, spider):
  if not self._urls:
    return

  target = self._targets[(self._batch-1) % len(self._targets)]

  data = [
     ("project", self._project),
     ("spider", spider.name),
     ("setting", "FEED_URI=%s" % self._feed_uri),
     ("batch", str(self._batch)),
  ]

  json_urls = json.dumps(self._urls)
  data.append(("setting", "DISTRIBUTED_START_URLS=%s" % json_urls))

  d = treq.post("http://%s/schedule.json" % target,
         data=data, timeout=5, persistent=False)

  self._scrapyd_submits_to_wait.append(d)

  self._urls = []
  self._batch += 1

首先,它使用一个批次计数器(_batch)来决定要将该批次发送到哪个Scrapyd服务器中。我们在_targets(DISTRIBUTED_TARGET_HOSTS设置)中保持更新可用的服务器。然后,构造POST请求到Scrapyd的schedule.json。这比之前通过curl执行的更加高级,因为它传递了一些精心挑选的参数。基于这些参数,Scrapyd可以有效地计划运行任务,类似如下所示。

scrapy crawl distr \
-s DISTRIBUTED_START_URLS='[".../property_000000.html", ... ]' \
-s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' \
-a batch=1

除了项目和爬虫名外,我们还向爬虫传递了一个FEED_URI设置。我们可以从DISTRIBUTED_TARGET_FEED_URL设置中获取该值。

由于Scrapy支持FTP,我们可以让Scrapyd通过匿名FTP的方式将爬取到的Item文件上传到Spark服务器中。格式包含爬虫名(%(name)s)和时间(%(time)s)。如果只使用这些,那么当两个文件的创建时间相同时,最终会产生冲突。为了避免意外覆盖,我们还添加了一个%(batch)s参数。默认情况下,Scrapy不知道任何关于批次的事情,因此我们需要找到一种方式来设置该值。Scrapyd中schedule.json这个API的一个有趣特性是,如果参数不是设置或少数几个已知参数的话,它将会被作为参数传给爬虫。默认情况下,爬虫参数将会成为爬虫属性,未知的FEED_URI参数将会去查阅爬虫的属性。因此,通过传递batch参数给schedule.json,我们可以在FEED_URI中使用它以避免冲突。

最后一步是使用编码为JSON的该批次详情页URL编译为DISTRIBUTED_ START_URLS设置。除了熟悉和简单之外,使用该格式并没有什么特殊的理由。任何文本格式都可以做到。

通过命令行向Scrapy传输大量数据丝毫也不优雅。在一些时候,你想要将参数存储到数据存储中(比如Redis),并且只向Scrapy传输ID。如果想要这样做,则需要在_flush_urls()和process_start_requests()中做一些小的改变。

我们使用treq.post()处理POST请求。Scrapyd对持久化连接处理得不是很好,因此使用persistent=False禁用该功能。为了安全起见,我们还设置了一个5秒的超时时间。有趣的是,我们为该请求在_scrapyd_submits_to_wait列表中存储了延迟函数,后续内容中将会进行讲解。关闭该函数时,我们将重置_urls列表,并增加当前的_batch值。

出人意料的是,我们在关闭操作处理器中发现了如下所示的诸多功能。

def __init__(self, crawler):
  ...
  crawler.signals.connect(self._closed, signal=signals.spider_
closed)

@defer.inlineCallbacks
def _closed(self, spider, reason, signal, sender):
  # Submit any remaining URLs
  self._flush_urls(spider)

  yield defer.DeferredList(self._scrapyd_submits_to_wait)

_close()将会在我们按下Ctrl + C或爬取完成时被调用。无论哪种情况,我们都不希望丢失属于最后一个批次的任何URL,因为它们还没有被发送出去。这就是为什么我们在_close()方法中首先要做的是调用_flush_urls(spider)清空最后的批次的原因。第二个问题是,作为非阻塞代码,任何treq.post()在停止爬取时都可能完成或没有完成。为了避免丢失任何批次,我们将使用之前提及的scrapyd_submits_to_wait列表,来包含所有的treq.post()的延迟函数。我们使用defer.DeferredList()进行等待,直到全部完成。由于_close()使用了@defer.inlineCallbacks,我们只需对其执行yield操作,并在所有请求完成之后进行恢复即可。

总结来说,在DISTRIBUTED_START_URLS设置中包含批量URL的任务将被送往Scrapyd服务器,并在这些Scrapyd服务器中运行相同的爬虫。很明显,我们需要某种方式以使用该设置初始化start_urls。

11.4.3 从设置中获取初始URL

当你注意到爬虫中间件提供的用于处理爬虫给我们的start_requests的process_start_requests()方法时,就会感受到爬虫中间件是怎样满足我们的需求的。我们检测DISTRIBUTED_START_URLS设置是否已被设定,如果是的话,则解码JSON并使用其中的URL对相关的Request进行yield操作。对于这些请求,我们设置CrawlSpider的_response_download()方法作为回调,并设置meta['rule']参数,以便其Response能够被适当的Rule处理。坦白来说,我们查阅了Scrapy的源代码,发现CrawlSpider创建Request的方式使用了相同的方法。在本例中,代码如下所示。

def __init__(self, crawler):
  ...
  self._start_urls = settings.get('DISTRIBUTED_START_URLS', None)
  self.is_worker = self._start_urls is not None

def process_start_requests(self, start_requests, spider):
  if not self.is_worker:
    for x in start_requests:
      yield x
  else:
    for url in json.loads(self._start_urls):
      yield Request(url, spider._response_downloaded,
             meta={'rule': self._target})

我们的中间件已经准备好了。可以在settings.py中启用它并进行设置。

SPIDER_MIDDLEWARES = {
  'properties.middlewares.Distributed': 100,
}
DISTRIBUTED_TARGET_RULE = 1
DISTRIBUTED_BATCH_SIZE = 2000
DISTRIBUTED_TARGET_FEED_URL = ("ftp://anonymous@spark/"
                "%(batch)s_%(name)s_%(time)s.jl")
DISTRIBUTED_TARGET_HOSTS = [
  "scrapyd1:6800",
  "scrapyd2:6800",
  "scrapyd3:6800",
]

一些人可能会认为DISTRIBUTED_TARGET_RULE不应该作为设置,因为不同爬虫之间可能是不一样的。你可以将其认为是默认值,并且可以在爬虫中使用custom_settings属性进行覆写,比如:

custom_settings = {
  'DISTRIBUTED_TARGET_RULE': 3
}

不过在我们的例子中并不需要这么做。我们可以做一个测试运行,爬取作为设置提供的单个页面。

$ scrapy crawl distr -s \
DISTRIBUTED_START_URLS='["http://web:9312/properties/property_000000.html"]'

当爬取成功后,可以尝试更进一步,爬取页面后使用FTP传输给Spark服务器。

scrapy crawl distr -s \
DISTRIBUTED_START_URLS='["http://web:9312/properties/property_000000.  
html"]' \
-s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' -a batch=12

如果你通过ssh登录到Spark服务器中(稍后会有更多介绍),将会看到一个文件位于/root/items目录中,比如12_distr_date_time.jl。

上述是使用Scrapyd实现分布式爬取的中间件的示例实现。你可以使用它作为起点,实现满足自己特殊需求的版本。你可能需要适配的事情包括如下内容。

· 支持的爬虫类型。比如,一个不局限于CrawlSpider的替代方案可能需要你的爬虫通过适当的meta以及采用回调命名约定的方式来标记分布式请求。

· 向Scrapyd传输URL的方式。你可能希望使用特定域名信息来减少传输的信息量。比如,在本例中,我们只传输了房产的ID。

· 你可以使用更优雅的分布式队列解决方案,使爬虫能够从失败中恢复,并允许Scrapyd将更多的URL提交到批处理。

· 你可以动态填充目标服务器列表,以支持按需扩展。

11.4.4 在Scrapyd服务器中部署项目

为了能够在我们的3台Scrapyd服务器中部署爬虫,我们需要将这3台服务器添加到scrapy.cfg文件中。该文件中的每个[deploy:target-name]区域都定义了一个新的部署目标。

$ pwd
/root/book/ch11/properties
$ cat scrapy.cfg
...
[deploy:scrapyd1]
url = http://scrapyd1:6800/
[deploy:scrapyd2]
url = http://scrapyd2:6800/
[deploy:scrapyd3]
url = http://scrapyd3:6800/

可以通过scrapyd-deploy -l查询当前可用的目标。

$ scrapyd-deploy -l
scrapyd1       http://scrapyd1:6800/
scrapyd2       http://scrapyd2:6800/
scrapyd3       http://scrapyd3:6800/

通过scrapyd-deploy <target-name>,可以很容易地部署任意服务器。

$ scrapyd-deploy scrapyd1
Packing version 1449991257
Deploying to project "properties" in http://scrapyd1:6800/addversion.json
Server response (200):
{"status": "ok", "project": "properties", "version": "1449991257", 
"spiders": 2, "node_name": "scrapyd1"}

该过程会留给我们一些额外的目录和文件(build、project.egg-info、setup.py),我们可以安全地删除它们。本质上,scrapyd-deplo``y所做的事情就是打包你的项目,并使用addversion.json上传到目标Scrapyd服务器当中。

之后,当我们使用scrapyd-deploy -L查询单台服务器时,可以确认项目是否已经被成功部署,如下所示。

$ scrapyd-deploy -L scrapyd1
properties

我还在项目目录中使用touch创建了3个空文件(scrapyd1-3)。使用scrapyd*扩展为文件名称,同样也是目标服务器的名称。之后,你可以使用一个bash循环部署所有服务器:for i in scrapyd*; do scrapyd-deploy $i; done。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文