9.3 使用 Twisted 专用客户端建立服务接口
到目前为止,我们看到了如何通过treq使用类REST API。Scrapy还可以和许多其他使用Twisted专用客户端的服务建立接口。比如,我们想要与MongoDB建立接口,当搜索"MongoDB Python"时,将会得到PyMongo,该库是阻塞/同步的,不能和Twisted一起使用,除非使用后续小节中的方法,在管道中描述线程,处理阻塞操作。如果搜索"MongoDB Twisted Python",将会得到txmongo,该库可以在Twisted和Scrapy中完美运行。通常情况下,Twisted客户端背后的社区都很小,但相比自行编写客户端,这仍然是一个更好的选择。我们将使用一个类似的Twisted专用客户端作为接口,处理Redis键值对存储。
9.3.1 用于读写Redis的管道
Google Geocoding API是按照IP进行限制的。我们可以利用多个IP(例如使用多台服务器)进行缓解,此时需要避免重复请求其他机器上已经完成地理编码的地址。这种情况也适用于之前运行中曾见到过的地址。我们不想浪费宝贵的限额。
请与API供应商沟通,确保在他们的策略下这种做法是可行的。比如,你可能必须每隔几分钟/小时就要丢弃掉缓存记录,或者根本不允许缓存。
我们可以使用Redis的键值对缓存,从本质上说,它是一个分布式的字典。我们已经在vagrant环境中运行了一个Redis实例,可以使用redis-cli命令,从开发机连接它并执行基本操作。
$ redis-cli -h redis redis:6379> info keyspace # Keyspace redis:6379> set key value OK redis:6379> info keyspace # Keyspace db0:keys=1,expires=0,avg_ttl=0 redis:6379> FLUSHALL OK redis:6379> info keyspace # Keyspace redis:6379> exit
通过Google搜索"Redis Twisted",我们找到了txredisapi库。其本质区别是它不再是同步Python库的包装,而是适用于Twisted的库,它使用reactor.connectTCP()连接Redis、实现Twisted协议等。使用该库的方式与其他库类似,不过在Twisted应用中使用它时,其效率肯定会更高一些。我们在安装它时可以再附带一个工具库——dj_redis_url,该工具库用于解析Redis配置URL,我们可以使用pip进行安装(sudo pip install txredisapi dj_redis_url),和往常一样,在我们的开发机中也已经预先安装好了这些库。
可以按如下代码初始化RedisCache。
from txredisapi import lazyConnectionPool class RedisCache(object): ... def __init__(self, crawler, redis_url, redis_nm): self.redis_url = redis_url self.redis_nm = redis_nm args = RedisCache.parse_redis_url(redis_url) self.connection = lazyConnectionPool(connectTimeout=5, replyTimeout=5, **args) crawler.signals.connect( self.item_scraped,signal=signals.item_scraped)
该管道非常简单。为了连接Redis服务器,我们需要主机地址、端口等参数,由于这些参数是以URL格式存储的,因此需要使用parse_redis_url()方法解析该格式(为简洁起见已经省略)。为键设置前缀作为命名空间的行为非常常见,在本例中,我们将其存储在redis_nm中。然后,使用txredisapi的lazyConnectionPool(),打开到服务器的连接。
最后一行使用了一个很有意思的函数。我们的目的是将地理编码管道与该管道包装起来。如果在Redis中没有某个值,我们将不会设置该值,我们的地理编码管道将像之前那样使用API对地址进行地理编码。在该操作完成之后,需要有一种方式在Redis中缓存这些键值对,在这里是通过连接到signals.item_scraped信号的方式实现的。我们定义的回调(item_scraped()方法,将很快看到)在非常靠后的位置被调用,此时坐标位置将会被设置。
本示例的完整代码位于 ch09/properties/properties/pipelines/redis.py。
我们通过查找和记录每个Item的地址和位置,保持了缓存的简单性。这对Redis来说是很有意义的,因为它经常运行在同一个服务器当中,这使得它运行速度非常快。如果不是这种情况,那么可能需要添加一个基于字典的缓存,与我们在地理编码管道中的实现类似。下面是处理传入的Item的方法。
@defer.inlineCallbacks def process_item(self, item, spider): address = item["address"][0] key = self.redis_nm + ":" + address value = yield self.connection.get(key) if value: item["location"] = json.loads(value) defer.returnValue(item)
和大家的期望相同。我们得到了地址,为其添加前缀,然后使用txredisapi connection的get()方法在Redis中查询。我们在Redis中存储的值是JSON编码的对象。如果值已经设定,则使用JSON对其进行解码,并将其设为坐标位置。
当一个Item到达所有管道的结尾时,我们重新捕获它,确保存储到Redis的位置值当中。下面是实现代码。
from txredisapi import ConnectionError def item_scraped(self, item, spider): try: location = item["location"] value = json.dumps(location, ensure_ascii=False) except KeyError: return address = item["address"][0] key = self.redis_nm + ":" + address quiet = lambda failure: failure.trap(ConnectionError) return self.connection.set(key, value).addErrback(quiet)
这里同样没有什么惊喜。如果我们找到一个位置,就可以得到地址,为其添加前缀,并使用它们作为键值对,用于txredisapi连接的set()方法。你会发现该函数没有使用@defer.inlineCallbacks,这是因为在处理signals.item_scraped时并不支持该装饰器。这就意味着无法再对connection.set()使用非常便捷的yield操作,不过我们可以做的工作是返回延迟操作,Scrapy可以用它串联任何未来的信号进行监听。无论何种情况,如果到Redis的连接无法执行connection.set(),就会抛出一个异常。可以通过添加自定义错误处理到connection.set()返回的延迟操作中,静默忽略该异常。在该错误处理中,我们将失败作为参数传递,并告知它们对任何ConnectionError执行trap()操作。这是Twisted的延迟操作API的一个非常好用的功能。通过在预期的异常中使用trap(),我们能够以紧凑的方式静默忽略它们。
为了启用该管道,我们所需做的就是将其添加到ITEM_PIPELINES设置中,并在settings.py文件中提供一个REDIS_PIPELINE_URL。为该管道设置一个比地理编码管道更小的优先级值非常重要,否则其运行就会太迟,无法起到作用。
ITEM_PIPELINES = { ... 'properties.pipelines.redis.RedisCache': 300, 'properties.pipelines.geo.GeoPipeline': 400, ... REDIS_PIPELINE_URL = 'redis://redis:6379'
我们可以像平时那样运行该爬虫。第一次运行将会和之前类似,不过接下来的每次运行都会像下面这样。
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=100 ... INFO: Enabled item pipelines: TidyUp, RedisCache, GeoPipeline, MysqlWriter, EsWriter ... Scraped... 0.0 items/s, avg latency: 0.00 s, time in pipelines: 0.00 s Scraped... 21.2 items/s, avg latency: 0.78 s, time in pipelines: 0.15 s Scraped... 24.2 items/s, avg latency: 0.82 s, time in pipelines: 0.16 s ... INFO: Dumping Scrapy stats: {... 'geo_pipeline/already_set': 106, 'item_scraped_count': 106,
可以看到GeoPipeline和RedisCache都已经启用,并且RedisCache会首先进行。另外,还可以注意到geo_pipeline/already_set统计值是106。这些是GeoPipeline从Redis缓存中找到的预先填充好的item,并且它们都不需要请求Google API调用。如果Redis缓存为空,你会看到一些键依然会使用Google API进行处理。在性能方面,我们注意到GeoPipeline引发的初始行为现在没有了。实际上,由于目前使用了缓存,因此绕过了每秒5个请求的API限制。当使用Redis时,还应当考虑使用过期键,使系统可以周期性地刷新缓存数据。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论