9.1 使用 REST API
REST是一套用于创建现代Web服务的技术,其主要优点是比SOAP或专有Web服务机制更加简单,更加轻量级。软件开发人员观察发现,Web服务经常提供的CRUD(创建、读取、更新、删除[Create、Read、Update、Delete])功能与HTTP基本操作(GET、POST、PUT、DELETE)具有相似性。另外,他们还发现典型的Web服务调用其所需的大部分信息时,都可以将其压缩到资源URL上。例如,http://api.mysite.com/ customer/john是一个资源URL,它可以让我们确定目标服务器(api.mysite.com),实际上我正在尝试在服务器上执行和customers(表)相关的操作,更具体的说就是执行和john(行——主键)相关的操作。当它与其他Web概念(如安全认证、无状态、缓存、使用XML或JSON作为载荷等)结合时,能够通过一种强大而又简单、熟悉且可以轻松跨平台的方式,提供和使用Web服务。难怪REST可以掀起软件行业的一场风暴。
9.1.1 使用treq
treq是一个Python包,相当于基于Twisted应用编写的Python requests包。它可以让我们轻松执行GET、POST以及其他HTTP请求。想要安装该包,可以使用pip install treq,不过它已经在我们的开发机中预先安装好了。
我们更倾向于选择treq而不是Scrapy的Request/crawler.engine.download()的原因是,虽然它们都很简单,但是在性能上treq更有优势,我们将会在第10章中看到更详细的介绍。
9.1.2 用于写入Elasticsearch的管道
首先,我们要编写一个将Item存储到ES(Elasticsearch)服务器的爬虫。你可能会觉得从ES开始,甚至先于MySQL,作为持久化机制进行讲解有些不太寻常,不过其实它是我们可以做的最简单的事情。ES可以是无模式的,也就是说无需任何配置就能够使用它。对于我们这个(非常简单的)用例来说,treq也已经足够使用。如果想要使用更高级的ES功能,则需要考虑使用txes2或其他Python/Twisted ES包。
在我们的开发机中,已经包含正在运行的ES服务器了。下面登录到开发机中,验证其是否正在正常运行。
$ curl http://es:9200 { "name" : "Living Brain", "cluster_name" : "elasticsearch", "version" : { ... }, "tagline" : "You Know, for Search" }
在宿主机浏览器中,访问http://localhost:9200,也可以看到同样的结果。当访问http://localhost:9200/properties/property/_search时,可以看到返回的响应表示ES进行了全局性的尝试,但是没有找到任何与房产信息相关的索引。恭喜你,刚刚已经使用了ES的REST API。
在本章,我们将在properties集合中插入房产信息。你可能需要重置properties集合,此时可以使用curl执行DELETE请求:
$ curl -XDELETE http://es:9200/properties
本章中管道实现的完整代码包含很多额外的细节,如更多的错误处理等,不过我将通过凸显关键点的方式,保持这里的代码简洁。
本章在ch09目录当中,其中本示例的代码为ch09/properties/properties/pipelines/es.py。
从本质上说,爬虫代码只包含如下4行。
@defer.inlineCallbacks def process_item(self, item, spider): data = json.dumps(dict(item), ensure_ascii=False).encode("utf- 8") yield treq.post(self.es_url, data)
其中,前两行用于定义标准的process_item()方法,可以在其中yield延迟操作(参考第8章)。
第 3 行用于准备要插入的data。首先,我们将Item转化为字典。然后使用json.dumps()将其编码为JSON格式。ensure_ascii=False的目的是通过不转义非ASCII字符,使得输出更加紧凑。然后,将这些JSON字符串编码为UTF-8,即JSON标准中的默认编码。
最后一行使用treq的post()方法执行POST请求,将文档插入到ElasticSearch中。es_url存储在settings.py文件当中(ES_PIPELINE_URL设置),如http:// es:9200/properties/property,可以提供一些基本信息,如ES服务器的IP和端口(es:9200)、集合名称(properties)以及想要写入的对象类型(property)。
要想启用该管道,需要将其添加到settings.py文件的ITEM_PIPELINES设置当中,并且使用ES_PIPELINE_URL设置进行初始化。
ITEM_PIPELINES = { 'properties.pipelines.tidyup.TidyUp': 100, 'properties.pipelines.es.EsWriter': 800, } ES_PIPELINE_URL = 'http://es:9200/properties/property'
完成上述工作后,我们可以进入到适当的目录当中。
$ pwd /root/book/ch09/properties $ ls properties scrapy.cfg
然后,开始运行爬虫。
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=90 ... INFO: Enabled item pipelines: EsWriter... INFO: Closing spider (closespider_itemcount)... 'item_scraped_count': 106,
如果现在再次访问http://localhost:9200/properties/ property/_ search,可以在响应的hits/total字段中看到已经插入的条目数量,以及前10条结果。我们还可以通过添加?size=100参数取得更多结果。在搜索URL中添加q=参数时,可以在全部或特定字段中搜索指定关键词。最相关的结果将会出现在最前面。例如,http://localhost: 9200/properties/property/_search?q=title: london,将会返回标题中包含"London"的房产信息。对于更加复杂的查询,可以查阅 ES 的官方文档,网址为:https://www.elastic.co/guide/en/elasticsearch/reference/current/
query-dsl-query-string-query.html。
ES不需要配置的原因是它可以根据我们提供的第一个属性自动检测模式(字段类型)。通过访问http://localhost:9200/properties/,可以看到其自动检测的映射关系。
让我们快速查看一下性能,使用上一章结尾处给出的方式重新运行scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000。平均延时从0.78秒增长到0.81秒,这是因为管道的平均时间从0.12秒增长到了0.15秒。吞吐量仍然保持在每秒大约25个Item。
使用管道将Item插入到数据库当中是不是一个好主意呢?答案是否定的。通常情况下,数据库提供的批量插入条目的方式可以有几个数量级的效率提升,因此我们应当使用这种方式。也就是说,应当将Item打包批量插入,或在爬虫结束时以后置处理的步骤执行插入。我们将在最后一章中看到这些方法。不过,许多人仍然使用Item管道插入数据库,此时使用Twisted API而不是通用/阻塞的方法实现该方案才是正确的方式。
9.1.3 使用Google Geocoding API实现地理编码的管道
每个房产信息都有地区名称,因此我们想对其进行地理编码,也就是说找到它们对应的坐标(经度、纬度)。我们可以使用这些坐标将房产信息放到地图上,或是根据它们到某个位置的距离对搜索结果进行排序。开发这种功能需要复杂的数据库、文本匹配以及空间计算。而使用Google的Geocoding API,可以避免上面提到的几个问题。可以通过浏览器或curl打开下述URL以获取数据。
$ curl "https://maps.googleapis.com/maps/api/geocode/json?sensor=false&ad dress=london" { "results" : [ ... "formatted_address" : "London, UK", "geometry" : { ... "location" : { "lat" : 51.5073509, "lng" : -0.1277583 }, "location_type" : "APPROXIMATE", ... ], "status" : "OK" }
我们可以看到一个JSON对象,当搜索"location"时,可以很快发现Google提供的是伦敦中心坐标。如果继续搜索,会发现同一文档中还包含其他位置。其中,第一个坐标位置是最相关的。因此,如果存在results[0].geometry.location的话,它就是我们所需要的信息。
Google的Geocoding API可以使用之前用过的技术(treq)进行访问。只需几行,就可以找出一个地址的坐标位置(查看pipeline目录的geo.py文件),其代码如下。
@defer.inlineCallbacks def geocode(self, address): endpoint = 'http://web:9312/maps/api/geocode/json' parms = [('address', address), ('sensor', 'false')] response = yield treq.get(endpoint, params=parms) content = yield response.json() geo = content['results'][0]["geometry"]["location"] defer.returnValue({"lat": geo["lat"], "lon": geo["lng"]})
该函数使用了一个和前面用过的URL相似的URL,不过在这里将其指向到一个假的实现,以使其执行速度更快,侵入性更小,可离线使用并且更加可预测。可以使用endpoint = 'https://maps.googleapis.com/maps/api/geocode/json'来访问Google的服务器,不过需要记住的是Google对请求有严格的限制。address和sensor的值都通过treq的get()方法的params参数进行了自动URL编码。treq.get()方法返回了一个延迟操作,我们对其执行yield操作,以便在响应可用时恢复它。对response.json()的第二个yield操作,用于等待响应体加载完成并解析为Python对象。此时,我们可以得到第一个结果的位置信息,将其格式化为字典后,使用defer.returnValue()返回,该方法是从使用inlineCallbacks的方法返回值的最适当的方式。如果任何地方存在问题,该方法会抛出异常,并通过Scrapy报告给我们。
通过使用geocode(),process_item()可以变为一行代码,如下所示。
item["location"] = yield self.geocode(item["address"][0])
我们可以在ITEM_PIPELINES设置中添加并启用该管道,其优先级数值应当小于ES的优先级数值,以便ES获取坐标位置的值。
ITEM_PIPELINES = { ... 'properties.pipelines.geo.GeoPipeline': 400,
我们启用调试数据,运行一个快速的爬虫。
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=90 -L DEBUG ... {'address': [u'Greenwich, London'], ... 'image_urls': [u'http://web:9312/images/i06.jpg'], 'location': {'lat': 51.482577, 'lon': -0.007659}, 'price': [1030.0], ...
现在,可以看到Item中包含了location字段。太好了!不过当使用真实的Google API的URL临时运行它时,很快就会得到类似下面的异常。
File "pipelines/geo.py" in geocode (content['status'], address)) Exception: Unexpected status="OVER_QUERY_LIMIT" for address="*London"
这是我们在完整代码中放入的一个检查,用于确保Geocoding API的响应中status字段的值是OK。如果该值非真,则说明我们得到的返回数据不是期望的格式,无法被安全使用。在本例中,我们得到了OVER_QUERY_LIMIT状态,可以清楚地说明在什么地方发生了错误。这可能是我们在许多案例中都会面临的一个重要问题。由于Scrapy的引擎具备较高的性能,缓存和资源请求的限流成为了必须考虑的问题。
可以访问Geocoder API的文档来了解其限制:“免费用户API:每24小时允许2500个请求,每秒允许5个请求”。即使使用了Google Geocoding API的付费版本,仍然会有每秒10个请求的限流,这就意味着该讨论仍然是有意义的。
下面的实现看起来可能会比较复杂,但是它们必须在上下文中进行判断。而在典型的多线程环境中创建此类组件需要线程池和同步,这样就会产生更加复杂的代码。
下面是使用Twisted技术实现的一个简单而又足够好用的限流引擎。
class Throttler(object): def __init__(self, rate): self.queue = [] self.looping_call = task.LoopingCall(self._allow_one) self.looping_call.start(1. / float(rate)) def stop(self): self.looping_call.stop() def throttle(self): d = defer.Deferred() self.queue.append(d) return d def _allow_one(self): if self.queue: self.queue.pop(0).callback(None)
该代码中,延迟操作排队进入列表中,每次调用_allow_one()时依次触发它们;_allow_one()检查队列是否为空,如果不是,则调用最旧的延迟操作的callback()(先入先出,FIFO)。我们使用Twisted的task.LoopingCall() API周期性调用_allow_one()。使用Throttler非常简单。我们可以在管道的__init__中对其进行初始化,并在爬虫结束时对其进行清理。
class GeoPipeline(object): def __init__(self, stats): self.throttler = Throttler(5) # 5 Requests per second def close_spider(self, spider): self.throttler.stop()
在使用想要限流的资源之前(在本例中为在process_item()中调用geocode()),需要对限流器的throttle()方法执行yield操作。
yield self.throttler.throttle() item["location"] = yield self.geocode(item["address"][0])
在第一个yield时,代码将会暂停,等待足够的时间过去之后再恢复。比如,某个时刻共有11个延迟操作在队列中,我们的速率限制是每秒5个请求,我们的代码将会在队列清空时恢复,大约为11/5=2.2秒。
使用Throttler后,我们不再会发生错误,但是爬虫速度会变得非常慢。通过观察发现,示例的房产信息中只有有限的几个不同位置。这是使用缓存的一个非常好的机会。我们可以使用一个简单的Python字典来实现缓存,不过这种情况下将会产生竞态条件,导致不正确的API调用。下面是一个没有该问题的缓存,此外还演示了一些Python和Twisted的有趣特性。
class DeferredCache(object): def __init__(self, key_not_found_callback): self.records = {} self.deferreds_waiting = {} self.key_not_found_callback = key_not_found_callback @defer.inlineCallbacks def find(self, key): rv = defer.Deferred() if key in self.deferreds_waiting: self.deferreds_waiting[key].append(rv) else: self.deferreds_waiting[key] = [rv] if not key in self.records: try: value = yield self.key_not_found_callback(key) self.records[key] = lambda d: d.callback(value) except Exception as e: self.records[key] = lambda d: d.errback(e) action = self.records[key] for d in self.deferreds_waiting.pop(key): reactor.callFromThread(action, d) value = yield rv defer.returnValue(value)
该缓存看起来和人们通常期望的有些不同。它包含两个组成部分。
· self.deferreds_waiting:这是一个延迟操作的队列,等待指定键的值。
· self.records:这是已经出现的键-操作对的字典。
如果查看find()实现的中间部分,就会发现如果没有在self.records中找到一个键,则会调用一个预定义的callback函数,取得缺失值(yield self.key_not_found_callback(key))。该回调函数可能会抛出一个异常。我们要如何在Python中以紧凑的方式存储这些值或异常呢?由于Python是一种函数式语言,我们可以根据是否出现异常,在self.records中存储调用延迟操作的callback或errback的小函数(lambda)。在定义时,该值或异常被附加到lambda函数中。函数中对变量的依赖被称为闭包,这是大多数函数式编程语言最显著和强大的特性之一。
缓存异常有些不太常见,不过这意味着如果在第一次查找某个键时,key_not_found_callback(key)抛出了异常,那么接下来对相同键再次查询时仍然会抛出同样的异常,不需要再执行额外的调用。
find()实现的剩余部分提供了避免竞态条件的机制。如果要查询的键已经在进程当中,将会在self.deferreds_waiting字典中有记录。在这种情况下,我们不再额外调用key_not_found_callback(),只是添加到延迟操作列表中,等待该键。当key_not_found_callback()返回,并且该键的值变为可用时,触发每个等待该键的延迟操作。我们可以直接执行action(d),而不是使用reactor.callFromThread(),不过这样就必须处理所有抛出的异常,并且会创建一个不必要的长延迟链。
使用缓存非常简单。只需在__init__()中对其初始化,并在执行API调用时设置回调函数即可。在process_item()中,按照如下代码使用缓存。
def __init__(self, stats): self.cache = DeferredCache(self.cache_key_not_found_callback) @defer.inlineCallbacks def cache_key_not_found_callback(self, address): yield self.throttler.enqueue() value = yield self.geocode(address) defer.returnValue(value) @defer.inlineCallbacks def process_item(self, item, spider): item["location"] = yield self.cache.find(item["address"][0]) defer.returnValue(item)
本例的完整代码包含了更多的错误处理代码,能够对限流导致的错误重试调用(一个简单的while循环),并且还包含了更新爬虫状态的代码。
本例的完整代码文件地址为:ch09/properties/properties/pipelines/geo2.py。
要想启用该管道,需要禁用(注释掉)之前的实现,并且在settings.py文件的ITEM_PIPELINES中添加如下代码。
ITEM_PIPELINES = { 'properties.pipelines.tidyup.TidyUp': 100, 'properties.pipelines.es.EsWriter': 800, # DISABLE 'properties.pipelines.geo.GeoPipeline': 400, 'properties.pipelines.geo2.GeoPipeline': 400, }
然后,可以按照如下代码运行该爬虫。
$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000 ... Scraped... 15.8 items/s, avg latency: 1.74 s and avg time in pipelines: 0.94 s Scraped... 32.2 items/s, avg latency: 1.76 s and avg time in pipelines: 0.97 s Scraped... 25.6 items/s, avg latency: 0.76 s and avg time in pipelines: 0.14 s ... : Dumping Scrapy stats:... 'geo_pipeline/misses': 35, 'item_scraped_count': 1019,
可以看到,爬取延时最初由于填充缓存的原因非常高,但是很快就回到了之前的值。统计显示总共有35次未命中,这正是我们所用的示例数据集内不同位置的数量。显然,在本例中总共有1019 - 35 = 984次命中缓存。如果使用真实的Google API,并将每秒对API的请求数量稍微增加,比如通过将Throttler(5)改为Throttler(10),把每秒请求数从5增加到10,就会在geo_pipeline/retries统计中得到重试的记录。如果发生任何错误,比如使用API无法找到一个位置,将会抛出异常,并且会在geo_pipeline/errors统计中被捕获到。如果某个位置的坐标已经被设置(后面的小节中看到),则会在geo_pipeline/already_set统计中显示。最后,当访问http://localhost:9200/ properties/ property/_search,查看房产信息的ES时,可以看到包含坐标位置值的条目,比如{..."location": {"lat": 51.5269736, "lon": -0.0667204}...},这和我们所期望的一样(在运行之前清理集合,确保看到的不是旧值)。
9.1.4 在Elasticsearch中启用地理编码索引
既然已经拥有了坐标位置,现在就可以做一些事情了,比如根据距离对结果进行排序。下面是一个HTTP POST请求(使用curl执行),返回标题中包含"Angel"的房产信息,并按照它们与点{51.54, -0.19}的距离进行排序。
$ curl http://es:9200/properties/property/_search -d '{ "query" : {"term" : { "title" : "angel" } }, "sort": [{"_geo_distance": { "location": {"lat": 51.54, "lon": -0.19}, "order": "asc", "unit": "km", "distance_type": "plane" }}]}'
唯一的问题是当尝试运行它时,会发现运行失败,并得到了一个错误信息:"failed to find mapper for [location] for geo distance based sort"。这说明位置字段并不是执行空间操作的适当格式。要想设置为合适的类型,则需要手动重写其默认类型。首先,将其自动检测的映射关系保存到文件中。
$ curl 'http://es:9200/properties/_mapping/property' > property.txt
然后编辑property.txt的如下代码。
"location":{"properties":{"lat":{"type":"double"},"lon":{"type":"d ouble"}}}
将该行的代码修改为如下代码。
"location": {"type": "geo_point"}
另外,我们还删除了文件尾部的{"properties":{"mappings": and two }}。对该文件的修改到此为止。现在可以按如下代码删除旧类型,使用指定的模式创建新类型。
$ curl -XDELETE 'http://es:9200/properties' $ curl -XPUT 'http://es:9200/properties' $ curl -XPUT 'http://es:9200/properties/_mapping/property' --data @property.txt
现在可以再次运行该爬虫,并且可以重新运行本节前面的curl命令,此时将会得到按照距离排序的结果。我们的搜索返回了房产信息的JSON,额外包含了一个sort字段,该字段的值是到搜索点的距离,单位为千米。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论