4.3 多线程爬虫
现在,我们将串行下载网页的爬虫扩展成并行下载。需要注意的是,如果滥用这一功能,多线程爬虫请求内容速度过快,可能会造成服务器过载,或是IP地址被封禁。为了避免这一问题,我们的爬虫将会设置一个delay 标识,用于设定请求同一域名时的最小时间间隔。
作为本章示例的Alexa网站列表由于包含了100万个不同的域名,因而不会出现上述问题。但是,当你以后爬取同一域名下的不同网页时,就需要注意两次下载之间至少需要1秒钟的延时。
4.3.1 线程和进程如何工作
图4.2所示为一个包含有多个线程的进程的执行过程。
图4.2
当运行Python脚本或其他计算机程序时,就会创建包含有代码和状态的进程。这些进程通过计算机的一个或多个CPU来执行。不过,同一时刻每个CPU只会执行一个进程,然后在不同进程间快速切换,这样就给人以多个程序同时运行的感觉。同理,在一个进程中,程序的执行也是在不同线程间进行切换的,每个线程执行程序的不同部分。这就意味着当一个线程等待网页下载时,进程可以切换到其他线程执行,避免浪费CPU时间。因此,为了充分利用计算机中的所有资源尽可能快地下载数据,我们需要将下载分发到多个进程和线程中。
4.3.2 实现
幸运的是,在Python中实现多线程编程相对来说比较简单。我们可以保留与第1章开发的链接爬虫类似的队列结构,只是改为在多个线程中启动爬虫循环,以便并行下载这些链接。下面的代码是修改后的链接爬虫起始部分,这里把crawl 循环移到了函数内部。
import time import threading from downloader import Downloader SLEEP_TIME = 1 def threaded_crawler(..., max_threads=10): # the queue of URL's that still need to be crawled crawl_queue = [seed_url] # the URL's that have been seen seen = set([seed_url]) D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout) def process_queue(): while True: try: url = crawl_queue.pop() except IndexError: # crawl queue is empty break else: html = D(url) ...
下面是threaded_crawler 函数的剩余部分,这里在多个线程中启动了process_queue ,并等待其完成。
threads = [] while threads or crawl_queue: # the crawl is still active for thread in threads: if not thread.is_alive(): # remove the stopped threads threads.remove(thread) while len(threads) < max_threads and crawl_queue: # can start some more threads thread = threading.Thread(target=process_queue) # set daemon so main thread can exit when receives ctrl-c thread.setDaemon(True) thread.start() threads.append(thread) # all threads have been processed # sleep temporarily so CPU can focus execution elsewhere time.sleep(SLEEP_TIME))
当有URL可爬取时,上面代码中的循环会不断创建线程,直到达到线程池的最大值。在爬取过程中,如果当前队列中没有更多可以爬取的URL时,线程会提前停止。假设我们有2个线程以及2个待下载的URL。当第一个线程完成下载时,待爬取队列为空,因此该线程退出。第二个线程稍后也完成了下载,但又发现了另一个待下载的URL。此时thread 循环注意到还有URL需要下载,并且线程数未达到最大值,因此又会创建一个新的下载线程。
对threaded_crawler 接口的测试代码可以从https://bitbucket. org/wswp/code/src/tip/chapter04/threaded_test.py 获取。现在,让我们使用如下命令,测试多线程版本链接爬虫的性能。
$ time python threaded_test.py 5 ... 4m50.465s
由于我们使用了5个线程,因此下载速度几乎是串行版本的5倍。在4.4节中会对多线程性能进行更进一步的分析。
4.3.3 多进程爬虫
为了进一步改善性能,我们对多线程示例再度扩展,使其支持多进程。目前,爬虫队列都是存储在本地内存当中,其他进程都无法处理这一爬虫。为了解决该问题,需要把爬虫队列转移到MongoDB当中。单独存储队列,意味着即使是不同服务器上的爬虫也能够协同处理同一个爬虫任务。
请注意,如果想要拥有更加健壮的队列,则需要考虑使用专用的消息传输工具,比如Celery。不过,为了尽量减少本书中介绍的技术种类,我们在这里选择复用MongoDB。下面是基于MongoDB实现的队列代码。
from datetime import datetime, timedelta from pymongo import MongoClient, errors class MongoQueue: # possible states of a download OUTSTANDING, PROCESSING, COMPLETE = range(3) def __init__(self, client=None, timeout=300): self.client = MongoClient() if client is None else client self.db = self.client.cache self.timeout = timeout def __nonzero__(self): """Returns True if there are more jobs to process """ record = self.db.crawl_queue.find_one( {'status': {'$ne': self.COMPLETE}} ) return True if record else False def push(self, url): """Add new URL to queue if does not exist """ try: self.db.crawl_queue.insert({'_id': url, 'status': self.OUTSTANDING}) except errors.DuplicateKeyError as e: pass # this is already in the queue def pop(self): """Get an outstanding URL from the queue and set its status to processing. If the queue is empty a KeyError exception is raised. """ record = self.db.crawl_queue.find_and_modify( query={'status': self.OUTSTANDING}, update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}} ) if record: return record['_id'] else: self.repair() raise KeyError() def complete(self, url): self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}}) def repair(self): """Release stalled jobs """ record = self.db.crawl_queue.find_and_modify( query={ 'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)}, 'status': {'$ne': self.COMPLETE} }, update={'$set': {'status': self.OUTSTANDING}} ) if record: print 'Released:', record['_id']
上面代码中的队列定义了3种状态:OUTSTANDING 、PROCESSING 和COMPLETE 。当添加一个新URL时,其状态为OUTSTANDING ;当URL从队列中取出准备下载时,其状态为PROCESSING ;当下载结束后,其状态为COMPLETE 。该实现中,大部分代码都在关注从队列中取出的URL无法正常完成时的处理,比如处理URL的进程被终止的情况。为了避免丢失这些URL的结果,该类使用了一个timeout 参数,其默认值为300秒。在repair() 方法中,如果某个URL的处理时间超过了这个timeout 值,我们就认定处理过程出现了错误,URL的状态将被重新设为OUTSTANDING ,以便再次处理。
为了支持这个新的队列类型,还需要对多线程爬虫的代码进行少量修改,下面的代码中已经对修改部分进行了加粗处理。
def threaded_crawler(...): ... # the queue of URL's that still need to be crawled crawl_queue = MongoQueue() crawl_queue.push(seed_url) def process_queue(): while True: # keep track that are processing url try: url = crawl_queue.pop() except KeyError: # currently no urls to process break else: ... crawl_queue.complete(url)
第一个改动是将Python内建队列替换成基于MongoDB的新队列,这里将其命名为MongoQueue 。由于该队列会在内部实现中处理重复URL的问题,因此不再需要seen 变量。最后,在URL处理结束后调用complete() 方法,用于记录该URL已经被成功解析。
更新后的多线程爬虫还可以启动多个进程,如下面的代码所示。
import multiprocessing def process_link_crawler(args, **kwargs): num_cpus = multiprocessing.cpu_count() print 'Starting {} processes'.format(num_cpus) processes = [] for i in range(num_cpus): p = multiprocessing.Process(target=threaded_crawler, args=[args], kwargs=kwargs) p.start() processes.append(p) # wait for processes to complete for p in processes: p.join()
这段代码的结构看起来十分熟悉,因为多进程模块和之前使用的多线程模块接口相似。这段代码中首先获取可用CPU的个数,在每个新进程中启动多线程爬虫,然后等待所有进程完成执行。
现在,让我们使用如下命令,测试多进程版本链接爬虫的性能。测试process_link_crawler 的接口和之前测试多线程爬虫时一样,可以从https://bitbucket.org/wswp/code/src/tip/chapter04/ process_test.py 获取。
$ time python process_test.py 5 Starting 2 processes ... 2m5.405s
通过脚本检测,测试服务器包含2个CPU,运行时间大约是之前使用单一进程执行多线程爬虫时的一半。在下一节中,我们将进一步研究这三种方式的相对性能。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论