7.2 控制节点
控制节点(ControlNode)主要分为URL管理器、数据存储器和控制调度器。控制调度器通过三个进程来协调URL管理器和数据存储器的工作:一个是URL管理进程,负责URL的管理和将URL传递给爬虫节点;一个是数据提取进程,负责读取爬虫节点返回的数据,将返回数据中的URL交给URL管理进程,将标题和摘要等数据交给数据存储进程;最后一个是数据存储进程,负责将数据提取进程中提交的数据进行本地存储。执行流程如图7-2所示。
图7-1 主从爬虫结构
图7-2 控制节点执行流程
7.2.1 URL管理器
参考第6章的代码,我们对URL管理器做了一些优化。我们采用set内存去重的方式,如果直接存储大量的URL链接,尤其是URL链接很长的时候,很容易造成内存溢出,所以我们将爬取过的URL进行MD5处理。字符串经过MD5处理后的信息摘要长度为128位,将生成的MD5摘要存储到set后,可以减少好几倍的内存消耗,不过Python中的MD5算法生成的是256位,取中间的128位即可。我们同时添加了save_progress和load_progress方法进行序列化的操作,将未爬取URL集合和已爬取的URL集合序列化到本地,保存当前的进度,以便下次恢复状态。URL管理器URLManager.py代码如下:
# coding:utf-8 import cPickle import hashlib class UrlManager(object): def __init__(self): self.new_urls = self.load_progress('new_urls.txt')# 未爬取URL集合 self.old_urls = self.load_progress('old_urls.txt')# 已爬取URL集合 def has_new_url(self): ''' 判断是否有未爬取的URL :return: ''' return self.new_url_size()!=0 def get_new_url(self): ''' 获取一个未爬取的URL :return: ''' new_url = self.new_urls.pop() m = hashlib.md5() m.update(new_url) self.old_urls.add(m.hexdigest()[8:-8]) return new_url def add_new_url(self,url): ''' 将新的URL添加到未爬取的URL集合中 :param url:单个URL :return: ''' if url is None: return m = hashlib.md5() m.update(url) url_md5 = m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not in self.old_urls: self.new_urls.add(url) def add_new_urls(self,urls): ''' 将新的URL添加到未爬取的URL集合中 :param urls:url集合 :return: ''' if urls is None or len(urls)==0: return for url in urls: self.add_new_url(url) def new_url_size(self): ''' 获取未爬取URL集合的大小 :return: ''' return len(self.new_urls) def old_url_size(self): ''' 获取已经爬取URL集合的大小 :return: ''' return len(self.old_urls) def save_progress(self,path,data): ''' 保存进度 :param path:文件路径 :param data:数据 :return: ''' with open(path, 'wb') as f: cPickle.dump(data, f) def load_progress(self,path): ''' 从本地文件加载进度 :param path:文件路径 :return:返回set集合 ''' print '[+] 从文件加载进度: %s' % path try: with open(path, 'rb') as f: tmp = cPickle.load(f) return tmp except: print '[!] 无进度文件, 创建: %s' % path return set()
7.2.2 数据存储器
数据存储器的内容基本上和第6章的一样,不过生成的文件按照当前时间进行命名,以避免重复,同时对文件进行缓存写入。代码如下:
# coding:utf-8 import codecs import time class DataOutput(object): def __init__(self): self.filepath='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time. localtime()) ) self.output_head(self.filepath) self.datas=[] def store_data(self,data): if data is None: return self.datas.append(data) if len(self.datas)>10: self.output_html(self.filepath) def output_head(self,path): ''' 将HTML头写进去 :return: ''' fout=codecs.open(path,'w',encoding='utf-8') fout.write("<html>") fout.write("<body>") fout.write("<table>") fout.close() def output_html(self,path): ''' 将数据写入HTML文件中 :param path: 文件路径 :return: ''' fout=codecs.open(path,'a',encoding='utf-8') for data in self.datas: fout.write("<tr>") fout.write("<td>%s</td>"%data['url']) fout.write("<td>%s</td>"%data['title']) fout.write("<td>%s</td>"%data['summary']) fout.write("</tr>") self.datas.remove(data) fout.close() def ouput_end(self,path): ''' 输出HTML结束 :param path: 文件存储路径 :return: ''' fout=codecs.open(path,'a',encoding='utf-8') fout.write("</table>") fout.write("</body>") fout.write("</html>") fout.close()
7.2.3 控制调度器
控制调度器主要是产生并启动URL管理进程、数据提取进程和数据存储进程,同时维护4个队列保持进程间的通信,分别为url_queue、result_queue、conn_q、store_q。4个队列说明如下:
·url_q队列是URL管理进程将URL传递给爬虫节点的通道。
·result_q队列是爬虫节点将数据返回给数据提取进程的通道。
·conn_q队列是数据提取进程将新的URL数据提交给URL管理进程的通道。
·store_q队列是数据提取进程将获取到的数据交给数据存储进程的通道。
因为要和工作节点进行通信,所以分布式进程必不可少。参考1.4.4节中服务进程的代码(Linux版),创建一个分布式管理器,定义为start_manager方法。方法代码如下:
def start_Manager(self,url_q,result_q): ''' 创建一个分布式管理器 :param url_q: url队列 :param result_q: 结果队列 :return: ''' # 把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象, # 将Queue对象在网络中暴露 BaseManager.register('get_task_queue',callable=lambda:url_q) BaseManager.register('get_result_queue',callable=lambda:result_q) # 绑定端口8001,设置验证口令“baike”。这个相当于对象的初始化 manager=BaseManager(address=('',8001),authkey='baike') # 返回manager对象 return manager
URL管理进程将从conn_q队列获取到的新URL提交给URL管理器,经过去重之后,取出URL放入url_queue队列中传递给爬虫节点,代码如下:
def url_manager_proc(self,url_q,conn_q,root_url): url_manager = UrlManager() url_manager.add_new_url(root_url) while True: while(url_manager.has_new_url()): # 从URL管理器获取新的URL new_url = url_manager.get_new_url() # 将新的URL发给工作节点 url_q.put(new_url) print 'old_url=',url_manager.old_url_size() # 加一个判断条件,当爬取2000个链接后就关闭,并保存进度 if(url_manager.old_url_size()>2000): # 通知爬行节点工作结束 url_q.put('end') print '控制节点发起结束通知!' # 关闭管理节点,同时存储set状态 url_manager.save_progress('new_urls.txt',url_manager.new_urls) url_manager.save_progress('old_urls.txt',url_manager.old_urls) return # 将从result_solve_proc获取到的URL添加到URL管理器 try: if not conn_q.empty(): urls = conn_q.get() url_manager.add_new_urls(urls) except BaseException,e: time.sleep(0.1)# 延时休息
数据提取进程从result_queue队列读取返回的数据,并将数据中的URL添加到conn_q队列交给URL管理进程,将数据中的文章标题和摘要添加到store_q队列交给数据存储进程。代码如下:
def result_solve_proc(self,result_q,conn_q,store_q): while(True): try: if not result_q.empty(): content = result_q.get(True) if content['new_urls']=='end': # 结果分析进程接收通知然后结束 print '结果分析进程接收通知然后结束!' store_q.put('end') return conn_q.put(content['new_urls'])# url 为set 类型 store_q.put(content['data'])# 解析出来的数据为dict 类型 else: time.sleep(0.1)# 延时休息 except BaseException,e: time.sleep(0.1)# 延时休息
数据存储进程从store_q队列中读取数据,并调用数据存储器进行数据存储。代码如下:
def store_proc(self,store_q): output = DataOutput() while True: if not store_q.empty(): data = store_q.get() if data=='end': print '存储进程接受通知然后结束!' output.ouput_end(output.filepath) return output.store_data(data) else: time.sleep(0.1)
最后启动分布式管理器、URL管理进程、数据提取进程和数据存储进程,并初始化4个队列。代码如下:
if __name__=='__main__': # 初始化4个队列 url_q = Queue() result_q = Queue() store_q = Queue() conn_q = Queue() # 创建分布式管理器 node = NodeManager() manager = node.start_Manager(url_q,result_q) # 创建URL管理进程、 数据提取进程和数据存储进程 url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,conn_q, 'http://baike.baidu.com/view/284853.htm',)) result_solve_proc = Process(target=node.result_solve_proc, args=(result_q, conn_q,store_q,)) store_proc = Process(target=node.store_proc, args=(store_q,)) # 启动3个进程和分布式管理器 url_manager_proc.start() result_solve_proc.start() store_proc.start() manager.get_server().serve_forever()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论