返回介绍

7.2 控制节点

发布于 2024-01-26 22:39:51 字数 8230 浏览 0 评论 0 收藏 0

控制节点(ControlNode)主要分为URL管理器、数据存储器和控制调度器。控制调度器通过三个进程来协调URL管理器和数据存储器的工作:一个是URL管理进程,负责URL的管理和将URL传递给爬虫节点;一个是数据提取进程,负责读取爬虫节点返回的数据,将返回数据中的URL交给URL管理进程,将标题和摘要等数据交给数据存储进程;最后一个是数据存储进程,负责将数据提取进程中提交的数据进行本地存储。执行流程如图7-2所示。

图7-1 主从爬虫结构

图7-2 控制节点执行流程

7.2.1 URL管理器

参考第6章的代码,我们对URL管理器做了一些优化。我们采用set内存去重的方式,如果直接存储大量的URL链接,尤其是URL链接很长的时候,很容易造成内存溢出,所以我们将爬取过的URL进行MD5处理。字符串经过MD5处理后的信息摘要长度为128位,将生成的MD5摘要存储到set后,可以减少好几倍的内存消耗,不过Python中的MD5算法生成的是256位,取中间的128位即可。我们同时添加了save_progress和load_progress方法进行序列化的操作,将未爬取URL集合和已爬取的URL集合序列化到本地,保存当前的进度,以便下次恢复状态。URL管理器URLManager.py代码如下:

  # coding:utf-8
  import cPickle
  import hashlib
  class UrlManager(object):
     def __init__(self):
       self.new_urls = self.load_progress('new_urls.txt')# 未爬取URL集合
       self.old_urls = self.load_progress('old_urls.txt')# 已爬取URL集合
     def has_new_url(self):
       '''
       判断是否有未爬取的URL
       :return:
       '''
       return self.new_url_size()!=0
  
     def get_new_url(self):
       '''
       获取一个未爬取的URL
       :return:
       '''
       new_url = self.new_urls.pop()
       m = hashlib.md5()
       m.update(new_url)
       self.old_urls.add(m.hexdigest()[8:-8])
       return new_url
  
     def add_new_url(self,url):
       '''
        将新的URL添加到未爬取的URL集合中
       :param url:单个URL
       :return:
       '''
       if url is None:
            return
       m = hashlib.md5()
       m.update(url)
       url_md5 =  m.hexdigest()[8:-8]
       if url not in self.new_urls and url_md5 not in self.old_urls:
            self.new_urls.add(url)
  
     def add_new_urls(self,urls):
       '''
       将新的URL添加到未爬取的URL集合中
       :param urls:url集合
       :return:
       '''
       if urls is None or len(urls)==0:
            return
       for url in urls:
            self.add_new_url(url)
  
     def new_url_size(self):
       '''
       获取未爬取URL集合的大小
       :return:
       '''
       return len(self.new_urls)
  
     def old_url_size(self):
       '''
       获取已经爬取URL集合的大小
       :return:
       '''
       return len(self.old_urls)
  
     def save_progress(self,path,data):
       '''
       保存进度
       :param path:文件路径
       :param data:数据
       :return:
       '''
       with open(path, 'wb') as f:
            cPickle.dump(data, f)
  
     def load_progress(self,path):
       '''
       从本地文件加载进度
       :param path:文件路径
       :return:返回set集合
       '''
       print '[+] 从文件加载进度: %s' % path
       try:
            with open(path, 'rb') as f:
              tmp = cPickle.load(f)
              return tmp
       except:
            print '[!] 无进度文件, 创建: %s' % path
       return set()

7.2.2 数据存储器

数据存储器的内容基本上和第6章的一样,不过生成的文件按照当前时间进行命名,以避免重复,同时对文件进行缓存写入。代码如下:

  # coding:utf-8
  import codecs
  import time
  class DataOutput(object):
     def __init__(self):
       self.filepath='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time. 
       localtime()) )
       self.output_head(self.filepath)
       self.datas=[]
     def store_data(self,data):
       if data is None:
            return
       self.datas.append(data)
       if len(self.datas)>10:
            self.output_html(self.filepath)
  
  
     def output_head(self,path):
       '''
       将HTML头写进去
       :return:
       '''
       fout=codecs.open(path,'w',encoding='utf-8')
       fout.write("<html>")
       fout.write("<body>")
       fout.write("<table>")
       fout.close()
  
     def output_html(self,path):
       '''
       将数据写入HTML文件中
       :param path: 文件路径
       :return:
       '''
       fout=codecs.open(path,'a',encoding='utf-8')
       for data in self.datas:
            fout.write("<tr>")
            fout.write("<td>%s</td>"%data['url'])
            fout.write("<td>%s</td>"%data['title'])
            fout.write("<td>%s</td>"%data['summary'])
            fout.write("</tr>")
            self.datas.remove(data)
       fout.close()
  
     def ouput_end(self,path):
       '''
       输出HTML结束
       :param path: 文件存储路径
       :return:
       '''
       fout=codecs.open(path,'a',encoding='utf-8')
       fout.write("</table>")
       fout.write("</body>")
       fout.write("</html>")
       fout.close()

7.2.3 控制调度器

控制调度器主要是产生并启动URL管理进程、数据提取进程和数据存储进程,同时维护4个队列保持进程间的通信,分别为url_queue、result_queue、conn_q、store_q。4个队列说明如下:

·url_q队列是URL管理进程将URL传递给爬虫节点的通道。

·result_q队列是爬虫节点将数据返回给数据提取进程的通道。

·conn_q队列是数据提取进程将新的URL数据提交给URL管理进程的通道。

·store_q队列是数据提取进程将获取到的数据交给数据存储进程的通道。

因为要和工作节点进行通信,所以分布式进程必不可少。参考1.4.4节中服务进程的代码(Linux版),创建一个分布式管理器,定义为start_manager方法。方法代码如下:

  def start_Manager(self,url_q,result_q):
     '''
     创建一个分布式管理器
     :param url_q: url队列
     :param result_q: 结果队列
     :return:
     '''
     # 把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象,
     # 将Queue对象在网络中暴露
     BaseManager.register('get_task_queue',callable=lambda:url_q)
     BaseManager.register('get_result_queue',callable=lambda:result_q)
     # 绑定端口8001,设置验证口令“baike”。这个相当于对象的初始化
     manager=BaseManager(address=('',8001),authkey='baike')
     # 返回manager对象
     return manager

URL管理进程将从conn_q队列获取到的新URL提交给URL管理器,经过去重之后,取出URL放入url_queue队列中传递给爬虫节点,代码如下:

def url_manager_proc(self,url_q,conn_q,root_url):
  url_manager = UrlManager()
  url_manager.add_new_url(root_url)
  while True:
    while(url_manager.has_new_url()):

      # 从URL管理器获取新的URL
      new_url = url_manager.get_new_url()
      # 将新的URL发给工作节点
      url_q.put(new_url)
      print 'old_url=',url_manager.old_url_size()
      # 加一个判断条件,当爬取2000个链接后就关闭,并保存进度
      if(url_manager.old_url_size()>2000):
        # 通知爬行节点工作结束
        url_q.put('end')
        print '控制节点发起结束通知!'
        # 关闭管理节点,同时存储set状态
        url_manager.save_progress('new_urls.txt',url_manager.new_urls)
        url_manager.save_progress('old_urls.txt',url_manager.old_urls)
        return
    # 将从result_solve_proc获取到的URL添加到URL管理器
    try:
      if not conn_q.empty():
        urls = conn_q.get()
        url_manager.add_new_urls(urls)
    except BaseException,e:
      time.sleep(0.1)# 延时休息

数据提取进程从result_queue队列读取返回的数据,并将数据中的URL添加到conn_q队列交给URL管理进程,将数据中的文章标题和摘要添加到store_q队列交给数据存储进程。代码如下:

def result_solve_proc(self,result_q,conn_q,store_q):
   while(True):
    try:
        if not result_q.empty():
          content = result_q.get(True)
          if content['new_urls']=='end':
         # 结果分析进程接收通知然后结束
         print '结果分析进程接收通知然后结束!'
         store_q.put('end')
         return
         conn_q.put(content['new_urls'])# url 为set 类型
         store_q.put(content['data'])# 解析出来的数据为dict 类型
         else:
         time.sleep(0.1)# 延时休息
      except BaseException,e:
       time.sleep(0.1)# 延时休息

数据存储进程从store_q队列中读取数据,并调用数据存储器进行数据存储。代码如下:

  def store_proc(self,store_q):
     output = DataOutput()
     while True:
       if not store_q.empty():
            data = store_q.get()
            if data=='end':
              print '存储进程接受通知然后结束!'
              output.ouput_end(output.filepath)
  
              return
            output.store_data(data)
       else:
            time.sleep(0.1)

最后启动分布式管理器、URL管理进程、数据提取进程和数据存储进程,并初始化4个队列。代码如下:

  if __name__=='__main__':
     # 初始化4个队列
     url_q = Queue()
     result_q = Queue()
     store_q = Queue()
     conn_q = Queue()
     # 创建分布式管理器
     node = NodeManager()
     manager = node.start_Manager(url_q,result_q)
     # 创建URL管理进程、 数据提取进程和数据存储进程
     url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,conn_q, 
       'http://baike.baidu.com/view/284853.htm',))
     result_solve_proc = Process(target=node.result_solve_proc, args=(result_q, 
       conn_q,store_q,))
     store_proc = Process(target=node.store_proc, args=(store_q,))
     # 启动3个进程和分布式管理器
     url_manager_proc.start()
     result_solve_proc.start()
     store_proc.start()
     manager.get_server().serve_forever()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文