返回介绍

9.3 使用 Twisted 专用客户端建立服务接口

发布于 2024-01-30 22:48:37 字数 5252 浏览 0 评论 0 收藏 0

到目前为止,我们看到了如何通过treq使用类REST API。Scrapy还可以和许多其他使用Twisted专用客户端的服务建立接口。比如,我们想要与MongoDB建立接口,当搜索"MongoDB Python"时,将会得到PyMongo,该库是阻塞/同步的,不能和Twisted一起使用,除非使用后续小节中的方法,在管道中描述线程,处理阻塞操作。如果搜索"MongoDB Twisted Python",将会得到txmongo,该库可以在Twisted和Scrapy中完美运行。通常情况下,Twisted客户端背后的社区都很小,但相比自行编写客户端,这仍然是一个更好的选择。我们将使用一个类似的Twisted专用客户端作为接口,处理Redis键值对存储。

9.3.1 用于读写Redis的管道

Google Geocoding API是按照IP进行限制的。我们可以利用多个IP(例如使用多台服务器)进行缓解,此时需要避免重复请求其他机器上已经完成地理编码的地址。这种情况也适用于之前运行中曾见到过的地址。我们不想浪费宝贵的限额。

请与API供应商沟通,确保在他们的策略下这种做法是可行的。比如,你可能必须每隔几分钟/小时就要丢弃掉缓存记录,或者根本不允许缓存。

我们可以使用Redis的键值对缓存,从本质上说,它是一个分布式的字典。我们已经在vagrant环境中运行了一个Redis实例,可以使用redis-cli命令,从开发机连接它并执行基本操作。

$ redis-cli -h redis
redis:6379> info keyspace
# Keyspace
redis:6379> set key value
OK
redis:6379> info keyspace
# Keyspace
db0:keys=1,expires=0,avg_ttl=0
redis:6379> FLUSHALL
OK
redis:6379> info keyspace
# Keyspace
redis:6379> exit

通过Google搜索"Redis Twisted",我们找到了txredisapi库。其本质区别是它不再是同步Python库的包装,而是适用于Twisted的库,它使用reactor.connectTCP()连接Redis、实现Twisted协议等。使用该库的方式与其他库类似,不过在Twisted应用中使用它时,其效率肯定会更高一些。我们在安装它时可以再附带一个工具库——dj_redis_url,该工具库用于解析Redis配置URL,我们可以使用pip进行安装(sudo pip install txredisapi dj_redis_url),和往常一样,在我们的开发机中也已经预先安装好了这些库。

可以按如下代码初始化RedisCache。

from txredisapi import lazyConnectionPool
class RedisCache(object):
...
  def __init__(self, crawler, redis_url, redis_nm):
    self.redis_url = redis_url
    self.redis_nm = redis_nm

    args = RedisCache.parse_redis_url(redis_url)
    self.connection = lazyConnectionPool(connectTimeout=5,
                       replyTimeout=5,
                       **args)
    crawler.signals.connect(
        self.item_scraped,signal=signals.item_scraped)

该管道非常简单。为了连接Redis服务器,我们需要主机地址、端口等参数,由于这些参数是以URL格式存储的,因此需要使用parse_redis_url()方法解析该格式(为简洁起见已经省略)。为键设置前缀作为命名空间的行为非常常见,在本例中,我们将其存储在redis_nm中。然后,使用txredisapi的lazyConnectionPool(),打开到服务器的连接。

最后一行使用了一个很有意思的函数。我们的目的是将地理编码管道与该管道包装起来。如果在Redis中没有某个值,我们将不会设置该值,我们的地理编码管道将像之前那样使用API对地址进行地理编码。在该操作完成之后,需要有一种方式在Redis中缓存这些键值对,在这里是通过连接到signals.item_scraped信号的方式实现的。我们定义的回调(item_scraped()方法,将很快看到)在非常靠后的位置被调用,此时坐标位置将会被设置。

本示例的完整代码位于 ch09/properties/properties/pipelines/redis.py。

我们通过查找和记录每个Item的地址和位置,保持了缓存的简单性。这对Redis来说是很有意义的,因为它经常运行在同一个服务器当中,这使得它运行速度非常快。如果不是这种情况,那么可能需要添加一个基于字典的缓存,与我们在地理编码管道中的实现类似。下面是处理传入的Item的方法。

@defer.inlineCallbacks
def process_item(self, item, spider):
  address = item["address"][0]
  key = self.redis_nm + ":" + address
  value = yield self.connection.get(key)
  if value:
    item["location"] = json.loads(value)
  defer.returnValue(item)

和大家的期望相同。我们得到了地址,为其添加前缀,然后使用txredisapi connection的get()方法在Redis中查询。我们在Redis中存储的值是JSON编码的对象。如果值已经设定,则使用JSON对其进行解码,并将其设为坐标位置。

当一个Item到达所有管道的结尾时,我们重新捕获它,确保存储到Redis的位置值当中。下面是实现代码。

from txredisapi import ConnectionError

def item_scraped(self, item, spider):
  try:
    location = item["location"]
    value = json.dumps(location, ensure_ascii=False)
  except KeyError:
    return

  address = item["address"][0]
  key = self.redis_nm + ":" + address
  quiet = lambda failure: failure.trap(ConnectionError)
  return self.connection.set(key, value).addErrback(quiet)

这里同样没有什么惊喜。如果我们找到一个位置,就可以得到地址,为其添加前缀,并使用它们作为键值对,用于txredisapi连接的set()方法。你会发现该函数没有使用@defer.inlineCallbacks,这是因为在处理signals.item_scraped时并不支持该装饰器。这就意味着无法再对connection.set()使用非常便捷的yield操作,不过我们可以做的工作是返回延迟操作,Scrapy可以用它串联任何未来的信号进行监听。无论何种情况,如果到Redis的连接无法执行connection.set(),就会抛出一个异常。可以通过添加自定义错误处理到connection.set()返回的延迟操作中,静默忽略该异常。在该错误处理中,我们将失败作为参数传递,并告知它们对任何ConnectionError执行trap()操作。这是Twisted的延迟操作API的一个非常好用的功能。通过在预期的异常中使用trap(),我们能够以紧凑的方式静默忽略它们。

为了启用该管道,我们所需做的就是将其添加到ITEM_PIPELINES设置中,并在settings.py文件中提供一个REDIS_PIPELINE_URL。为该管道设置一个比地理编码管道更小的优先级值非常重要,否则其运行就会太迟,无法起到作用。

ITEM_PIPELINES = { ...
  'properties.pipelines.redis.RedisCache': 300,
  'properties.pipelines.geo.GeoPipeline': 400,
...
REDIS_PIPELINE_URL = 'redis://redis:6379'

我们可以像平时那样运行该爬虫。第一次运行将会和之前类似,不过接下来的每次运行都会像下面这样。

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=100
...
INFO: Enabled item pipelines: TidyUp, RedisCache, GeoPipeline, 
MysqlWriter, EsWriter
...
Scraped... 0.0 items/s, avg latency: 0.00 s, time in pipelines: 0.00 s
Scraped... 21.2 items/s, avg latency: 0.78 s, time in pipelines: 0.15 s
Scraped... 24.2 items/s, avg latency: 0.82 s, time in pipelines: 0.16 s
...
INFO: Dumping Scrapy stats: {...
  'geo_pipeline/already_set': 106,
  'item_scraped_count': 106,

可以看到GeoPipeline和RedisCache都已经启用,并且RedisCache会首先进行。另外,还可以注意到geo_pipeline/already_set统计值是106。这些是GeoPipeline从Redis缓存中找到的预先填充好的item,并且它们都不需要请求Google API调用。如果Redis缓存为空,你会看到一些键依然会使用Google API进行处理。在性能方面,我们注意到GeoPipeline引发的初始行为现在没有了。实际上,由于目前使用了缓存,因此绕过了每秒5个请求的API限制。当使用Redis时,还应当考虑使用过期键,使系统可以周期性地刷新缓存数据。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文