返回介绍

4.3 多线程爬虫

发布于 2024-02-05 23:37:18 字数 6986 浏览 0 评论 0 收藏 0

现在,我们将串行下载网页的爬虫扩展成并行下载。需要注意的是,如果滥用这一功能,多线程爬虫请求内容速度过快,可能会造成服务器过载,或是IP地址被封禁。为了避免这一问题,我们的爬虫将会设置一个delay 标识,用于设定请求同一域名时的最小时间间隔。

作为本章示例的Alexa网站列表由于包含了100万个不同的域名,因而不会出现上述问题。但是,当你以后爬取同一域名下的不同网页时,就需要注意两次下载之间至少需要1秒钟的延时。

4.3.1 线程和进程如何工作

图4.2所示为一个包含有多个线程的进程的执行过程。

图4.2

当运行Python脚本或其他计算机程序时,就会创建包含有代码和状态的进程。这些进程通过计算机的一个或多个CPU来执行。不过,同一时刻每个CPU只会执行一个进程,然后在不同进程间快速切换,这样就给人以多个程序同时运行的感觉。同理,在一个进程中,程序的执行也是在不同线程间进行切换的,每个线程执行程序的不同部分。这就意味着当一个线程等待网页下载时,进程可以切换到其他线程执行,避免浪费CPU时间。因此,为了充分利用计算机中的所有资源尽可能快地下载数据,我们需要将下载分发到多个进程和线程中。

4.3.2 实现

幸运的是,在Python中实现多线程编程相对来说比较简单。我们可以保留与第1章开发的链接爬虫类似的队列结构,只是改为在多个线程中启动爬虫循环,以便并行下载这些链接。下面的代码是修改后的链接爬虫起始部分,这里把crawl 循环移到了函数内部。

import time
import threading
from downloader import Downloader
SLEEP_TIME = 1

def threaded_crawler(..., max_threads=10):
    # the queue of URL's that still need to be crawled
    crawl_queue = [seed_url]
    # the URL's that have been seen
    seen = set([seed_url])
    D = Downloader(cache=cache, delay=delay,
        user_agent=user_agent, proxies=proxies,
            num_retries=num_retries, timeout=timeout)

    def process_queue():
        while True:
            try:
                url = crawl_queue.pop()
            except IndexError:
                # crawl queue is empty
                break
            else:
                html = D(url)
                ...

下面是threaded_crawler 函数的剩余部分,这里在多个线程中启动了process_queue ,并等待其完成。

    threads = []
    while threads or crawl_queue:
        # the crawl is still active
        for thread in threads:
            if not thread.is_alive():
                # remove the stopped threads
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue:
            # can start some more threads
            thread = threading.Thread(target=process_queue)
            # set daemon so main thread can exit when receives ctrl-c
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)

        # all threads have been processed
        # sleep temporarily so CPU can focus execution elsewhere
        time.sleep(SLEEP_TIME))

当有URL可爬取时,上面代码中的循环会不断创建线程,直到达到线程池的最大值。在爬取过程中,如果当前队列中没有更多可以爬取的URL时,线程会提前停止。假设我们有2个线程以及2个待下载的URL。当第一个线程完成下载时,待爬取队列为空,因此该线程退出。第二个线程稍后也完成了下载,但又发现了另一个待下载的URL。此时thread 循环注意到还有URL需要下载,并且线程数未达到最大值,因此又会创建一个新的下载线程。

对threaded_crawler 接口的测试代码可以从https://bitbucket. org/wswp/code/src/tip/chapter04/threaded_test.py 获取。现在,让我们使用如下命令,测试多线程版本链接爬虫的性能。

$ time python threaded_test.py 5
...
4m50.465s

由于我们使用了5个线程,因此下载速度几乎是串行版本的5倍。在4.4节中会对多线程性能进行更进一步的分析。

4.3.3 多进程爬虫

为了进一步改善性能,我们对多线程示例再度扩展,使其支持多进程。目前,爬虫队列都是存储在本地内存当中,其他进程都无法处理这一爬虫。为了解决该问题,需要把爬虫队列转移到MongoDB当中。单独存储队列,意味着即使是不同服务器上的爬虫也能够协同处理同一个爬虫任务。

请注意,如果想要拥有更加健壮的队列,则需要考虑使用专用的消息传输工具,比如Celery。不过,为了尽量减少本书中介绍的技术种类,我们在这里选择复用MongoDB。下面是基于MongoDB实现的队列代码。

from datetime import datetime, timedelta
from pymongo import MongoClient, errors

class MongoQueue:
    # possible states of a download
    OUTSTANDING, PROCESSING, COMPLETE = range(3)

    def __init__(self, client=None, timeout=300):
        self.client = MongoClient() if client is None else client
        self.db = self.client.cache
        self.timeout = timeout

    def __nonzero__(self):
        """Returns True if there are more jobs to process
        """
        record = self.db.crawl_queue.find_one(
            {'status': {'$ne': self.COMPLETE}}
        )
        return True if record else False

    def push(self, url):
        """Add new URL to queue if does not exist
        """
        try:
            self.db.crawl_queue.insert({'_id': url, 'status':
                self.OUTSTANDING})
        except errors.DuplicateKeyError as e:
            pass # this is already in the queue

    def pop(self):
        """Get an outstanding URL from the queue and set its
            status to processing. If the queue is empty a KeyError
                exception is raised.
        """
        record = self.db.crawl_queue.find_and_modify(
            query={'status': self.OUTSTANDING},
            update={'$set': {'status': self.PROCESSING,
                'timestamp': datetime.now()}}
        )
        if record:
            return record['_id']
        else:
            self.repair()
            raise KeyError()

    def complete(self, url):
        self.db.crawl_queue.update({'_id': url}, {'$set':
            {'status': self.COMPLETE}})

    def repair(self):
        """Release stalled jobs
        """
        record = self.db.crawl_queue.find_and_modify(
            query={
                'timestamp': {'$lt': datetime.now() -
                    timedelta(seconds=self.timeout)},
                'status': {'$ne': self.COMPLETE}
            },
            update={'$set': {'status': self.OUTSTANDING}}
        )
        if record:
            print 'Released:', record['_id']

上面代码中的队列定义了3种状态:OUTSTANDING 、PROCESSING 和COMPLETE 。当添加一个新URL时,其状态为OUTSTANDING ;当URL从队列中取出准备下载时,其状态为PROCESSING ;当下载结束后,其状态为COMPLETE 。该实现中,大部分代码都在关注从队列中取出的URL无法正常完成时的处理,比如处理URL的进程被终止的情况。为了避免丢失这些URL的结果,该类使用了一个timeout 参数,其默认值为300秒。在repair() 方法中,如果某个URL的处理时间超过了这个timeout 值,我们就认定处理过程出现了错误,URL的状态将被重新设为OUTSTANDING ,以便再次处理。

为了支持这个新的队列类型,还需要对多线程爬虫的代码进行少量修改,下面的代码中已经对修改部分进行了加粗处理。

def threaded_crawler(...):
    ...
    # the queue of URL's that still need to be crawled
        crawl_queue = MongoQueue()
        crawl_queue.push(seed_url)
 
    def process_queue():
        while True:
            # keep track that are processing url
            try:
                url = crawl_queue.pop()
            except KeyError:
                # currently no urls to process
                break
            else:
                ...
                crawl_queue.complete(url)

第一个改动是将Python内建队列替换成基于MongoDB的新队列,这里将其命名为MongoQueue 。由于该队列会在内部实现中处理重复URL的问题,因此不再需要seen 变量。最后,在URL处理结束后调用complete() 方法,用于记录该URL已经被成功解析。

更新后的多线程爬虫还可以启动多个进程,如下面的代码所示。

import multiprocessing

def process_link_crawler(args, **kwargs):
    num_cpus = multiprocessing.cpu_count()
    print 'Starting {} processes'.format(num_cpus)
    processes = []
    for i in range(num_cpus):
        p = multiprocessing.Process(target=threaded_crawler,
            args=[args], kwargs=kwargs)
        p.start()
        processes.append(p)
    # wait for processes to complete
    for p in processes:
        p.join()

这段代码的结构看起来十分熟悉,因为多进程模块和之前使用的多线程模块接口相似。这段代码中首先获取可用CPU的个数,在每个新进程中启动多线程爬虫,然后等待所有进程完成执行。

现在,让我们使用如下命令,测试多进程版本链接爬虫的性能。测试process_link_crawler 的接口和之前测试多线程爬虫时一样,可以从https://bitbucket.org/wswp/code/src/tip/chapter04/ process_test.py 获取。

$ time python process_test.py 5
Starting 2 processes
...
2m5.405s

通过脚本检测,测试服务器包含2个CPU,运行时间大约是之前使用单一进程执行多线程爬虫时的一半。在下一节中,我们将进一步研究这三种方式的相对性能。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文