11.4 爬虫和中间件的变化
为了构建该系统,我们需要稍微对Scrapy爬虫进行修改,并且需要开发爬虫中间件。更具体地说,我们必须执行如下操作:
· 调整索引页爬取,以最大速率执行;
· 编写中间件,分批发送URL到Scrapyd服务器;
· 使用相同中间件,允许在启动时使用批量URL。
我们将尝试使用尽可能小的改动来实现这些变化。理想情况下,整个操作应该清晰、易理解并且对其依赖的爬虫代码透明。这应该是一个基础架构层级的需求,如果想对爬虫(可能数百个)进行修改来实现它则是一个坏主意。
11.4.1 索引页分片爬取
我们的第一步是优化索引页爬取,使其尽可能更快。在开始之前,先来设置一些期望。假设爬虫爬取并发量是16,并且我们测量得到其与源网站服务器的延迟大约为0.25秒。此时得到的吞吐量最多为16 / 0.25 = 64页/秒。索引页数量为50000个详情页 / 每个索引页30个详情页链接 = 1667索引页。因此,我们期望索引页下载花费的时间大约为1667 / 64 = 26秒多一点。
让我们以第3章中名为easy的爬虫开始。先把执行垂直抓取的Rule注释掉(callback='parse_item'的那个),因为现在只需要爬取索引页。
你可以在GitHub中获取到本书的全部代码。下载该代码,可以访问:git clone https://github.com/scalingexcellence/scrapybook。
本章中的完整代码位于ch11目录当中。
如果我们在进行任何优化之前对scrapy crawl只爬取10个页面的情况进行计时,可以得到如下结果。
$ ls properties scrapy.cfg $ pwd /root/book/ch11/properties $ time scrapy crawl easy -s CLOSESPIDER_PAGECOUNT=10 ... DEBUG: Crawled (200) <GET ...index_00000.html> (referer: None) DEBUG: Crawled (200) <GET ...index_00001.html> (referer: ...index_00000. html) ... real 0m4.099s
如果10个页面就花费了4秒时间,那么就不可能在26秒时间内完成1,700个页面。通过查看日志,我们发现每个页面都来自于前一个页面的下一页链接,也就是说在任意时刻都只有至多一个页面正在执行爬取。我们的有效并发为1。我们希望并行处理,得到想要的并发数量(16个并发请求)。我们将对索引分片,并允许一些额外的分片,以确保爬虫中的URL不会不足。我们将会把索引分为20个段。实际上,任何超过16的数值都能够增加速度,不过在超过20之后所得到的回报呈递减趋势。我们将通过如下表达式计算每个分片的起始索引ID。
>>> map(lambda x: 1667 * x / 20, range(20)) [0, 83, 166, 250, 333, 416, 500, ... 1166, 1250, 1333, 1416, 1500, 1583]
因此,我们使用如下代码设置start_urls。
start_urls = ['http://web:9312/properties/index_%05d.html' % id for id in map(lambda x: 1667 * x / 20, range(20))]
这可能和你的索引有很大的不同,因此我们没必要在此处实现得更漂亮。如果还设定了并发设置(CONCURRENT_REQUESTS、CONCURRENT_REQUESTS_PER_DOMAIN)为16,那么当运行爬虫时,将会得到如下结果。
$ time scrapy crawl easy -s CONCURRENT_REQUESTS=16 -s CONCURRENT_ REQUESTS_PER_DOMAIN=16 ... real 0m32.344s
该结果已经与期望值非常接近了。我们的下载速度为 1667个页面 / 32秒 = 52个索引页/秒,这就意味着每秒可以生成52×30 = 1560个详情页URL。现在,可以将垂直抓取的Rule的注释取消掉,保存文件作为新爬虫分发。我们不需要对爬虫代码进行更多修改,这显示出我们即将开发的中间件的强大以及非侵入性。如果只使用开发服务器运行scrapy crawl,假设处理详情页的速度和索引页处理时一样快,那么它将花费不少于50000 / 52 = 16分钟时间完成爬取。
本节有两个关键内容。在学习完第10章之后,我们已经可以实现真正的工程。我们能够精确计算出系统期望得到的性能,并且确保在达到该性能之前不会停止(在合理范围内)。第二个要记住的重要事情是,由于索引页爬取提供了详情页,爬取的总吞吐量将会是其吞吐量的最小值。如果我们生成的URL比Scrapyd能够消费得更快,那么URL将会堆积在其队列当中。反过来,如果生成的URL太慢,Scrapyd将会拥有过剩的无法利用的能力。
11.4.2 分批爬取URL
现在,我们准备开发处理详情页URL的基础架构,目的是对其进行垂直爬取、分批并分发到多台Scrapyd节点中,而不是在本地爬取。
如果查看第8章中的Scrapy架构,就可以很容易地得出结论,这是爬虫中间件的任务,因为它实现了process_spider_output(),在到达下载器之前,在此处处理请求,并能够中止它们。我们在实现中限制只支持基于CrawlSpider的爬虫,另外还只支持简单的GET请求。如果需要更加复杂,比如POST或有权限验证的请求,那么需要开发更复杂的功能来扩展参数、请求头,甚至可能在每次批量运行后重新登录。
在开始之前,先来快速浏览一下Scrapy的GitHub。我们将回顾SPIDER_MIDDLEWARES_BASE设置,以查看Scrapy提供的参考实现,以便尽最大可能复用它。Scrapy 1.0包含如下爬虫中间件:HttpErrorMiddleware、OffsiteMiddleware、RefererMiddleware、UrlLengthMiddleware以及DepthMiddleware。在快速了解它们的实现之后,我们发现OffsiteMiddleware(只有60行代码)与想要实现的功能很相似。它根据爬虫的allowed_domains属性,把URL限制在某些特定域名中。我们可以使用相似的模式吗?和OffsiteMiddleware实现中丢弃URL不同,我们将对这些URL进行分批并发送到Scrapyd节点中。事实证明这是可以的。下面是实现的部分代码。
def __init__(self, crawler): settings = crawler.settings self._target = settings.getint('DISTRIBUTED_TARGET_RULE', -1) self._seen = set() self._urls = [] self._batch_size = settings.getint('DISTRIBUTED_BATCH_SIZE', 1000) ... def process_spider_output(self, response, result, spider): for x in result: if not isinstance(x, Request): yield x else: rule = x.meta.get('rule') if rule == self._target: self._add_to_batch(spider, x) else: yield x def _add_to_batch(self, spider, request): url = request.url if not url in self._seen: self._seen.add(url) self._urls.append(url) if len(self._urls) >= self._batch_size: self._flush_urls(spider)
process_spider_output()既处理Item也处理Request。我们只想处理Request,因此我们对其他所有内容执行yield操作。如果查看CrawlSpider的源代码,就会注意到将Request / Response映射到Rule的方式是通过其meta字典的名为'rule'的整型字段。我们检查该数值,如果它指向目标的Rule(DISTRIBUTED_TARGET_RULE设置),则会调用_add_to_batch()添加URL到当前批次。然后,丢弃该Request。对其他所有Request执行yield操作,比如下一页链接、无变化的链接。_add_to_batch()方法实现了一个去重机制。不过很遗憾的是,由于前一节中描述的分片流程,我们可能对少数URL抽取两次。我们使用_seen集合检测并丢弃重复值。然后,把这些URL添加到_urls列表中,如果其大小超过_batch_size(DISTRIBUTED_BATCH_SIZE设置),就会触发调用_flush_urls()。该方法提供了如下的关键功能。
def __init__(self, crawler): ... self._targets = settings.get("DISTRIBUTED_TARGET_HOSTS") self._batch = 1 self._project = settings.get('BOT_NAME') self._feed_uri = settings.get('DISTRIBUTED_TARGET_FEED_URL', None) self._scrapyd_submits_to_wait = [] def _flush_urls(self, spider): if not self._urls: return target = self._targets[(self._batch-1) % len(self._targets)] data = [ ("project", self._project), ("spider", spider.name), ("setting", "FEED_URI=%s" % self._feed_uri), ("batch", str(self._batch)), ] json_urls = json.dumps(self._urls) data.append(("setting", "DISTRIBUTED_START_URLS=%s" % json_urls)) d = treq.post("http://%s/schedule.json" % target, data=data, timeout=5, persistent=False) self._scrapyd_submits_to_wait.append(d) self._urls = [] self._batch += 1
首先,它使用一个批次计数器(_batch)来决定要将该批次发送到哪个Scrapyd服务器中。我们在_targets(DISTRIBUTED_TARGET_HOSTS设置)中保持更新可用的服务器。然后,构造POST请求到Scrapyd的schedule.json。这比之前通过curl执行的更加高级,因为它传递了一些精心挑选的参数。基于这些参数,Scrapyd可以有效地计划运行任务,类似如下所示。
scrapy crawl distr \ -s DISTRIBUTED_START_URLS='[".../property_000000.html", ... ]' \ -s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' \ -a batch=1
除了项目和爬虫名外,我们还向爬虫传递了一个FEED_URI设置。我们可以从DISTRIBUTED_TARGET_FEED_URL设置中获取该值。
由于Scrapy支持FTP,我们可以让Scrapyd通过匿名FTP的方式将爬取到的Item文件上传到Spark服务器中。格式包含爬虫名(%(name)s)和时间(%(time)s)。如果只使用这些,那么当两个文件的创建时间相同时,最终会产生冲突。为了避免意外覆盖,我们还添加了一个%(batch)s参数。默认情况下,Scrapy不知道任何关于批次的事情,因此我们需要找到一种方式来设置该值。Scrapyd中schedule.json这个API的一个有趣特性是,如果参数不是设置或少数几个已知参数的话,它将会被作为参数传给爬虫。默认情况下,爬虫参数将会成为爬虫属性,未知的FEED_URI参数将会去查阅爬虫的属性。因此,通过传递batch参数给schedule.json,我们可以在FEED_URI中使用它以避免冲突。
最后一步是使用编码为JSON的该批次详情页URL编译为DISTRIBUTED_ START_URLS设置。除了熟悉和简单之外,使用该格式并没有什么特殊的理由。任何文本格式都可以做到。
通过命令行向Scrapy传输大量数据丝毫也不优雅。在一些时候,你想要将参数存储到数据存储中(比如Redis),并且只向Scrapy传输ID。如果想要这样做,则需要在_flush_urls()和process_start_requests()中做一些小的改变。
我们使用treq.post()处理POST请求。Scrapyd对持久化连接处理得不是很好,因此使用persistent=False禁用该功能。为了安全起见,我们还设置了一个5秒的超时时间。有趣的是,我们为该请求在_scrapyd_submits_to_wait列表中存储了延迟函数,后续内容中将会进行讲解。关闭该函数时,我们将重置_urls列表,并增加当前的_batch值。
出人意料的是,我们在关闭操作处理器中发现了如下所示的诸多功能。
def __init__(self, crawler): ... crawler.signals.connect(self._closed, signal=signals.spider_ closed) @defer.inlineCallbacks def _closed(self, spider, reason, signal, sender): # Submit any remaining URLs self._flush_urls(spider) yield defer.DeferredList(self._scrapyd_submits_to_wait)
_close()将会在我们按下Ctrl + C或爬取完成时被调用。无论哪种情况,我们都不希望丢失属于最后一个批次的任何URL,因为它们还没有被发送出去。这就是为什么我们在_close()方法中首先要做的是调用_flush_urls(spider)清空最后的批次的原因。第二个问题是,作为非阻塞代码,任何treq.post()在停止爬取时都可能完成或没有完成。为了避免丢失任何批次,我们将使用之前提及的scrapyd_submits_to_wait列表,来包含所有的treq.post()的延迟函数。我们使用defer.DeferredList()进行等待,直到全部完成。由于_close()使用了@defer.inlineCallbacks,我们只需对其执行yield操作,并在所有请求完成之后进行恢复即可。
总结来说,在DISTRIBUTED_START_URLS设置中包含批量URL的任务将被送往Scrapyd服务器,并在这些Scrapyd服务器中运行相同的爬虫。很明显,我们需要某种方式以使用该设置初始化start_urls。
11.4.3 从设置中获取初始URL
当你注意到爬虫中间件提供的用于处理爬虫给我们的start_requests的process_start_requests()方法时,就会感受到爬虫中间件是怎样满足我们的需求的。我们检测DISTRIBUTED_START_URLS设置是否已被设定,如果是的话,则解码JSON并使用其中的URL对相关的Request进行yield操作。对于这些请求,我们设置CrawlSpider的_response_download()方法作为回调,并设置meta['rule']参数,以便其Response能够被适当的Rule处理。坦白来说,我们查阅了Scrapy的源代码,发现CrawlSpider创建Request的方式使用了相同的方法。在本例中,代码如下所示。
def __init__(self, crawler): ... self._start_urls = settings.get('DISTRIBUTED_START_URLS', None) self.is_worker = self._start_urls is not None def process_start_requests(self, start_requests, spider): if not self.is_worker: for x in start_requests: yield x else: for url in json.loads(self._start_urls): yield Request(url, spider._response_downloaded, meta={'rule': self._target})
我们的中间件已经准备好了。可以在settings.py中启用它并进行设置。
SPIDER_MIDDLEWARES = { 'properties.middlewares.Distributed': 100, } DISTRIBUTED_TARGET_RULE = 1 DISTRIBUTED_BATCH_SIZE = 2000 DISTRIBUTED_TARGET_FEED_URL = ("ftp://anonymous@spark/" "%(batch)s_%(name)s_%(time)s.jl") DISTRIBUTED_TARGET_HOSTS = [ "scrapyd1:6800", "scrapyd2:6800", "scrapyd3:6800", ]
一些人可能会认为DISTRIBUTED_TARGET_RULE不应该作为设置,因为不同爬虫之间可能是不一样的。你可以将其认为是默认值,并且可以在爬虫中使用custom_settings属性进行覆写,比如:
custom_settings = { 'DISTRIBUTED_TARGET_RULE': 3 }
不过在我们的例子中并不需要这么做。我们可以做一个测试运行,爬取作为设置提供的单个页面。
$ scrapy crawl distr -s \ DISTRIBUTED_START_URLS='["http://web:9312/properties/property_000000.html"]'
当爬取成功后,可以尝试更进一步,爬取页面后使用FTP传输给Spark服务器。
scrapy crawl distr -s \ DISTRIBUTED_START_URLS='["http://web:9312/properties/property_000000. html"]' \ -s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' -a batch=12
如果你通过ssh登录到Spark服务器中(稍后会有更多介绍),将会看到一个文件位于/root/items目录中,比如12_distr_date_time.jl。
上述是使用Scrapyd实现分布式爬取的中间件的示例实现。你可以使用它作为起点,实现满足自己特殊需求的版本。你可能需要适配的事情包括如下内容。
· 支持的爬虫类型。比如,一个不局限于CrawlSpider的替代方案可能需要你的爬虫通过适当的meta以及采用回调命名约定的方式来标记分布式请求。
· 向Scrapyd传输URL的方式。你可能希望使用特定域名信息来减少传输的信息量。比如,在本例中,我们只传输了房产的ID。
· 你可以使用更优雅的分布式队列解决方案,使爬虫能够从失败中恢复,并允许Scrapyd将更多的URL提交到批处理。
· 你可以动态填充目标服务器列表,以支持按需扩展。
11.4.4 在Scrapyd服务器中部署项目
为了能够在我们的3台Scrapyd服务器中部署爬虫,我们需要将这3台服务器添加到scrapy.cfg文件中。该文件中的每个[deploy:target-name]区域都定义了一个新的部署目标。
$ pwd /root/book/ch11/properties $ cat scrapy.cfg ... [deploy:scrapyd1] url = http://scrapyd1:6800/ [deploy:scrapyd2] url = http://scrapyd2:6800/ [deploy:scrapyd3] url = http://scrapyd3:6800/
可以通过scrapyd-deploy -l查询当前可用的目标。
$ scrapyd-deploy -l scrapyd1 http://scrapyd1:6800/ scrapyd2 http://scrapyd2:6800/ scrapyd3 http://scrapyd3:6800/
通过scrapyd-deploy <target-name>,可以很容易地部署任意服务器。
$ scrapyd-deploy scrapyd1 Packing version 1449991257 Deploying to project "properties" in http://scrapyd1:6800/addversion.json Server response (200): {"status": "ok", "project": "properties", "version": "1449991257", "spiders": 2, "node_name": "scrapyd1"}
该过程会留给我们一些额外的目录和文件(build、project.egg-info、setup.py),我们可以安全地删除它们。本质上,scrapyd-deplo``y所做的事情就是打包你的项目,并使用addversion.json上传到目标Scrapyd服务器当中。
之后,当我们使用scrapyd-deploy -L查询单台服务器时,可以确认项目是否已经被成功部署,如下所示。
$ scrapyd-deploy -L scrapyd1 properties
我还在项目目录中使用touch创建了3个空文件(scrapyd1-3)。使用scrapyd*扩展为文件名称,同样也是目标服务器的名称。之后,你可以使用一个bash循环部署所有服务器:for i in scrapyd*; do scrapyd-deploy $i; done。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论