返回介绍

14.2 scrapy-redis 源码分析

发布于 2024-02-05 21:13:20 字数 11138 浏览 0 评论 0 收藏 0

在使用scrapy-redis前,我们先来分析scrapy-redis的源码,了解其实现原理。

使用git从github网站下载scrapy-redis源码:

scrapy-redis的源码并不多,因为它仅是利用Redis数据库重新实现了Scrapy中的某些组件。

对于一个分布式爬虫框架,需要解决以下两个最基本的问题。

分配爬取任务:为每个爬虫分配不重复的爬取任务。

汇总爬取数据:将所有爬虫爬取到的数据汇总到一处。

接下来我们看scrapy-redis是如何解决的。

14.2.1 分配爬取任务部分

scrapy-redis为多个爬虫分配爬取任务的方式是:让所有爬虫共享一个存在于Redis数据库中的请求队列(替代各爬虫独立的请求队列),每个爬虫从请求队列中获取请求,下载并解析页面后,将解析出的新请求再添加到请求队列中,因此每个爬虫既是下载任务的生产者又是消费者。

为实现多个爬虫的任务分配,scrapy-redis重新实现了以下组件:

基于Redis的请求队列(优先队列、FIFO、LIFO)。

基于Redis的请求去重过滤器(过滤掉重复的请求)。

基于以上两个组件的调度器。

1.调度器的实现

首先来看调度器Scheduler的实现,它位于scheduler.py中,Scheduler的核心代码如下:

import importlib
import six

from scrapy.utils.misc import load_object
from . import connection, defaults

class Scheduler(object):
 ...
 def open(self, spider):
   ...
   # 初始化请求队列
   try:
    self.queue = load_object(self.queue_cls)(
    server=self.server,
    spider=spider,
    key=self.queue_key % {'spider': spider.name},
    serializer=self.serializer,
  )
 except TypeError as e:
  raise ValueError("Failed to instantiate queue class '%s': %s",
        self.queue_cls, e)

 # 初始化去重过滤器
 try:
  self.df = load_object(self.dupefilter_cls)(
    server=self.server,
    key=self.dupefilter_key % {'spider': spider.name},
    debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
  )
 except TypeError as e:
  raise ValueError("Failed to instantiate dupefilter class '%s': %s",
        self.dupefilter_cls, e)
 ...

...
def enqueue_request(self, request):
 # 调用去重过滤器的request_seen 方法, 判断该request 对应的页面是否已经爬取过
 # 如果页面已经爬取过,且用户没有强制忽略过滤,就直接返回False
 if not request.dont_filter and self.df.request_seen(request):
  self.df.log(request, self.spider)
  return False
 if self.stats:
  self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
 # 将request 入队到请求队列
 self.queue.push(request)
 return True

def next_request(self):
 block_pop_timeout = self.idle_before_close
 # 从请求队列出队一个request
 request = self.queue.pop(block_pop_timeout)
 if request and self.stats:
  self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
 return request
...

分析上述代码如下:

调度器中最核心的两个方法是enqueue_request和next_request,它们分别对应请求的入队和出队操作。Spider提交的Request对象最终由Scrapy引擎调用enqueue_request添加到请求队列中,Scrapy引擎同时也调用next_request从请求队列中取出请求,送给下载器下载。

self.queue和self.df分别是请求队列和去重过滤器对象。在enqueue_request方法中,使用去重过滤器的request_seen方法判断request是否重复,即request对应的页面是否已经爬取过,如果用户没有强制忽略过滤,并且request是重复的,就抛弃该request,并直接返回False,否则调用self.queue的push方法将request入队,返回True。在next_request方法中,调用self.queue的pop方法出队一个request并返回。

再来看一下创建请求队列和去重过滤器对象的相关代码:

import importlib
import six

from scrapy.utils.misc import load_object
from . import connection, defaults

class Scheduler(object):
 def __init__(self, server, ...,
     queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
     dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS):
  ...
  self.server = server
  self.queue_cls = queue_cls
  self.dupefilter_cls = dupefilter_cls
  ...

 @classmethod
 def from_settings(cls, settings):
  ...
  # 从配置中读取请求队列和去重过滤器的类名
  optional = {
    ...
  'queue_cls': 'SCHEDULER_QUEUE_CLASS',
  'dupefilter_cls': 'DUPEFILTER_CLASS',
  ...
 }
 for name, setting_name in optional.items():
  val = settings.get(setting_name)
  if val:
    kwargs[name] = val
 ...
 server = connection.from_settings(settings)
 server.ping()

 return cls(server=server, **kwargs)
...

def open(self, spider):
 ...
 # 初始化请求队列
 try:
  self.queue = load_object(self.queue_cls)(
    server=self.server,
    spider=spider,
    key=self.queue_key % {'spider': spider.name},
    serializer=self.serializer,
  )
 except TypeError as e:
  raise ValueError("Failed to instantiate queue class '%s': %s",
        self.queue_cls, e)

 # 初始化去重过滤器
 try:
  self.df = load_object(self.dupefilter_cls)(
    server=self.server,
    key=self.dupefilter_key % {'spider': spider.name},
    debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
  )
 except TypeError as e:
  raise ValueError("Failed to instantiate dupefilter class '%s': %s",
   self.dupefilter_cls, e)
 ...
...

self.queue和self.df的创建是在open方法中调用load_object方法完成的,load_object方法的参数是类的导入路径(如scrapy_redis.queue.PriorityQueue),这种实现的好处是用户可以使用字符串在配置文件中灵活指定想要使用的队列类和过滤器类。self.queue_cls和self.dupefilter_cls便是从配置文件中读取的导入路径(或默认值)。

2.请求队列的实现

接下来看基于Redis的请求队列的实现。在queque.py中,包含以下3种请求队列:

PriorityQueue优先级队列(默认)

FifoQueue先进先出队列

LifoQueue后进先出队列

我们分析其中代码简短且容易理解的FifoQueue,代码如下:

class FifoQueue(Base):
 """Per-spider FIFO queue"""

 def __len__(self):
  """Return the length of the queue"""
  return self.server.llen(self.key)

 def push(self, request):
  """Push a request"""
  self.server.lpush(self.key, self._encode_request(request))

 def pop(self, timeout=0):
  """Pop a request"""
  if timeout > 0:
    # brpop: 阻塞的rpop,可以设置超时
    data = self.server.brpop(self.key, timeout)
    if isinstance(data, tuple):
      data = data[1]
  else:
    data = self.server.rpop(self.key)
  if data:
    return self._decode_request(data)

self.server是Redis数据库的连接对象(可理解为self.server = redis.StrictRedis(...)),该连接对象是在Scheduler的from_settings方法中创建的,在创建请求队列对象时,被传递给请求队列类的构造器。

观察在self.server上调用的方法可知,FifoQueue使用Redis中的一个列表实现队列,该列表在数据库中的键为self.key的值,可以通过配置文件设置(SCHEDULER_QUEUE_KEY),默认为<spider_name>:requests。

push方法对应请求的入队操作,先调用基类的_encode_request方法对request进行编码,然后调用Redis的lpush命令将其插入数据库中列表的最左端(入队)。

pop方法对应请求的出队操作,调用Redis的rpop或brpop命令从数据库中列表的最右端弹出一个经过编码的request(出队),再调用基类的_decode_request方法对其进行解码,然后返回。

__len__方法调用Redis的llen命令获取数据库中列表的长度,即请求队列的长度。

下面是FifoQueue、LifoQueue、PriorityQueue共同基类Base的部分代码:

from . import picklecompat

class Base(object):
 """Per-spider base queue class"""

 def __init__(self, server, spider, key, serializer=None):
  """Initialize per-spider redis queue.

  Parameters
  ----------
  server : StrictRedis
    Redis client instance.
  spider : Spider
    Scrapy spider instance.
  key: str
    Redis key where to put and get messages.
  serializer : object
    Serializer object with ``loads`` and ``dumps`` methods.
  """

  if serializer is None:
    serializer = picklecompat
  ...
 self.server = server
 self.spider = spider
 self.key = key % {'spider': spider.name}
 self.serializer = serializer

def _encode_request(self, request):
 """Encode a request object"""
 obj = request_to_dict(request, self.spider)
 return self.serializer.dumps(obj)

def _decode_request(self, encoded_request):
 """Decode an request previously encoded"""
 obj = self.serializer.loads(encoded_request)
 return request_from_dict(obj, self.spider)
...

可以看到,在对request进行编、解码时,调用的是self.serializer的dumps和loads方法。self.serializer同样可以通过配置文件指定(SCHEDULER_SERIALIZER),默认为Python标准库中的pickle模块。

3.去重过滤器的实现

最后来看基于Redis的去重过滤器RFPDupeFilter的实现,它位于dupefilter.py中,部分代码如下:

...
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint
...
class RFPDupeFilter(BaseDupeFilter):
 ...
 def __init__(self, server, key, debug=False):
  """Initialize the duplicates filter.

  Parameters
  ----------
  server : redis.StrictRedis
    The redis server instance.
  key : str
  Redis key Where to store fingerprints.
 debug : bool, optional
  Whether to log filtered requests.

 """
 self.server = server
 self.key = key
 self.debug = debug
 self.logdupes = True

...
def request_seen(self, request):
 fp = self.request_fingerprint(request)
 # This returns the number of values added, zero if already exists.
 added = self.server.sadd(self.key, fp)
 return added == 0

def request_fingerprint(self, request):
 return request_fingerprint(request)
...

self.server是Redis数据库的连接对象,与FifoQueue中相同。

观察在self.server上调用的方法可知,RFPDupeFilter使用Redis中的一个集合对请求进行去重,该集合在数据库中的键为self.key的值,可以通过配置文件设置(SCHEDULER_DUPEFILTER_KEY),默认为<spider_name>:dupefilter。

request_fingerprint方法用来获取一个请求的指纹,即请求的唯一标识,请求的指纹是使用Python标准库hashlib中的sha1算法计算得到的(详见scrapy.utils.request中的request_fingerprint函数)。

request_seen方法用来判断一个请求是否是重复的,先调用request_fingerprint方法计算request的指纹,然后调用Redis的sadd命令尝试将指纹添加到数据库中的集合中,根据sadd返回值判断请求是否重复,返回相应的布尔值结果,即重复返回True,否则返回False。

14.2.2 汇总爬取数据部分

1.RedisPipeline的实现

在分布式爬虫框架中,各个主机爬取到的数据最终要汇总到一处,通常是某种数据库。scrapy-redis提供了一个Item Pipeline(RedisPipeline),用于将各个爬虫爬取到的数据存入同一个Redis数据库中。

RedisPipeline位于pipeline.py中,代码如下:

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread

from . import connection, defaults

default_serialize = ScrapyJSONEncoder().encode

class RedisPipeline(object):
 """Pushes serialized item into a redis list/queue

 Settings
 --------
 REDIS_ITEMS_KEY : str
  Redis key where to store items.
 REDIS_ITEMS_SERIALIZER : str
  Object path to serializer function.

 """

 def __init__(self, server,
     key=defaults.PIPELINE_KEY,
     serialize_func=default_serialize):
  """Initialize pipeline.

  Parameters
  ----------
  server : StrictRedis
    Redis client instance.
  key : str
    Redis key where to store items.
  serialize_func : callable
    Items serializer function.

  """
  self.server = server
 self.key = key
 self.serialize = serialize_func

@classmethod
def from_settings(cls, settings):
 params = {
  'server': connection.from_settings(settings),
 }
 if settings.get('REDIS_ITEMS_KEY'):
  params['key'] = settings['REDIS_ITEMS_KEY']
 if settings.get('REDIS_ITEMS_SERIALIZER'):
  params['serialize_func'] = load_object(
    settings['REDIS_ITEMS_SERIALIZER']
  )

 return cls(**params)

@classmethod
def from_crawler(cls, crawler):
 return cls.from_settings(crawler.settings)

def process_item(self, item, spider):
 return deferToThread(self._process_item, item, spider)

def _process_item(self, item, spider):
 key = self.item_key(item, spider)
 data = self.serialize(item)
 self.server.rpush(key, data)
 return item

def item_key(self, item, spider):
 """Returns redis key based on given spider.

 Override this function to use a different key depending on the item
 and/or spider.

 """
 return self.key % {'spider': spider.name}

self.server是Redis数据库的连接对象,与FifoQueue中相同。

观察在self.server上调用的方法可知,RedisPipeline使用Redis中的一个列表存储所有爬虫爬取到的数据,该列表在数据库中的键为调用item_key方法的结果,即self.key %{'spider': spider.name}。self.key可以通过配置文件设置(REDIS_ITEMS_KEY),默认情况下列表的键为<spider_name>:items。

Redis的列表只能存储字符串,而Spider爬取到的数据item的类型是Item或Python字典,所以先要将item串行化成字符串,再存入Redis列表。串行化函数也可以通过配置文件指定(REDIS_ITEMS_SERIALIZER),默认情况下item被串行化成json串。

process_item方法处理爬取到的每一项数据,因为写入数据库为I/O操作,速度较慢,所以可以在线程中执行,调用twisted中的deferToThread方法,启动线程执行_process_item方法。

_process_item方法实际处理爬取到的每一项数据,先使用self.serial函数将item串行化成字符串,再调用Redis的rpush命令将其写入数据库中的列表。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文