返回介绍

建议89:使用线程池提高效率

发布于 2024-01-30 22:19:09 字数 5408 浏览 0 评论 0 收藏 0

我们知道线程的生命周期分为5个状态:创建、就绪、运行、阻塞和终止。自线程创建到终止,线程便不断在运行、就绪和阻塞这3个状态之间转换直至销毁。而真正占有CPU的只有运行、创建和销毁这3个状态。一个线程的运行时间由此可以分为3部分:线程的启动时间(Ts)、线程体的运行时间(Tr)以及线程的销毁时间(Td)。在多线程处理的情景中,如果线程不能够被重用,就意味着每次创建都需要经过启动、销毁和运行这3个过程。这必然会增加系统的相应时间,降低效率。而线程体的运行时间Tr不可控制,在这种情况下如何提高线程运行的效率呢?线程池便是一个解决方案。

线程池的基本原理如图8-6所示,它通过将事先创建多个能够执行任务的线程放入池中,所需要执行的任务通常被安排在队列中。通常情况下,需要处理的任务比线程数目要多,线程执行完当前任务后,会从队列中取下一个任务,直到所有的任务已经完成。

图8-6 线程池的基本原理

由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,带来更好的性能和系统稳定性。线程池技术适合处理突发性大量请求或者需要大量线程来完成任务、但任务实际处理时间较短的应用场景,它能有效避免由于系统中创建线程过多而导致的系统性能负荷过大、响应过慢等问题。

在Python中利用线程池有两种解决方案:一是自己实现线程池模式,二是使用线程池模块。我们先来看一个线程池模式的简单实现。

import Queue,sys,threading
import urllib2,os
# 
处理request
的工作线程
class Worker(threading.Thread): 
  def __init__( self, workQueue, resultQueue, **kwds): 
    threading.Thread.__init__( self, **kwds ) 
    self.setDaemon( True ) 
    self.workQueue = workQueue 
    self.resultQueue = resultQueue 
  def run( self ): 
    while True: 
        try: 
        callable, args, kwds = self.workQueue.get(False)#
从队列中取出一个任务
        res = callable(*args, **kwds) 
        self.resultQueue.put( res )           #
存放处理结果到队列中
      except Queue.Empty: 
        break        
class WorkerManager:                      #
线程池管理器
  def __init__( self, num_of_workers=10): 
    self.workQueue = Queue.Queue()            #
请求队列
    self.resultQueue = Queue.Queue()            #
输出结果的队列
    self.workers = [] 
    self._recruitThreads( num_of_workers ) 
  def _recruitThreads( self, num_of_workers ): 
    for i in range( num_of_workers ): 
      worker = Worker( self.workQueue, self.resultQueue )#
创建工作线程
      self.workers.append(worker)            #
加入线程队列中 
  def start(self):                       #
启动线程
    for w in self.workers: 
      w.start() 
  def wait_for_complete( self): 
    while len(self.workers): 
      worker = self.workers.pop()            #
从池中取出一个线程处理请求
      worker.join( ) 
      if worker.isAlive() and not self.workQueue.empty(): 
        self.workers.append( worker )          #
重新加入线程池中
    print "All jobs were completed." 
  def add_job( self, callable, *args, **kwds ): 
    self.workQueue.put( (callable, args, kwds) )       #
往工作队列中加入请求
  def get_result( self, *args, **kwds ):           #
获取处理结果
    return self.resultQueue.get( *args, **kwds )
  def download_file(url):
    print "begin download",url
    urlhandler = urllib2.urlopen(url)
    fname = os.path.basename(url)+".html"
    with open(fname, "wb") as f:
      while True:
         chunk = urlhandler.read(1024)
        if not chunk: break
        f.write(chunk)
urls = ["http://wiki.python.org/moin/WebProgramming",
        "https://www.createspace.com/3611970",
        "http://wiki.python.org/moin/Documentation"
]
wm = WorkerManager(2)                       #
创建线程池
for i in urls:
    wm.add_job( download_file, i )              #
将所有请求加入队列中
wm.start() 
wm.wait_for_complete()

自行实现线程池,需要定义一个Worker处理工作请求,定义WorkerManage来进行线程池的管理和创建,它包含一个工作请求列队和执行结果列队,具体的下载工作通过download_file()方法来实现。

相比自己实现的线程池模型,使用现成的线程池模块往往更简单。Python中线程池模块的下载地址为:https://pypi.python.org/pypi/threadpool。该模块提供了以下基本类和方法。

1)threadpool.ThreadPool:线程池类,主要的作用是用来分派任务请求和收集运行结果。主要有以下方法。

__init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):建立线程池,并启动对应 num_workers 的线程;q_size表示任务请求队列的大小,resq_size表示存放运行结果队列的大小。

createWorkers(self, num_workers, poll_timeout=5):将num_workers数量对应的线程加入线程池中。

dismissWorkers(self, num_workers, do_join=False):告诉num_workers数量的工作线程在执行完当前任务后退出。

joinAllDismissedWorkers(self):在设置为退出的线程上执行Thread.join。

putRequest(self, request, block=True, timeout=None):将工作请求放入队列中。

poll(self, block=False):处理任务队列中新的请求。

wait(self):阻塞用于等待所有执行结果。注意当所有执行结果返回后,线程池内部的线程并没有销毁,而在等待新的任务。因此,wait()之后仍然可以再次调用pool.putRequests()往其中添加任务。

2)threadpool.WorkRequest:包含有具体执行方法的工作请求类。

3)threadpool.WorkerThread:处理任务的工作线程,主要有run()方法以及dismiss()方法。

4)makeRequests(callable_,args_list,callback=None,exc_callback=_handle_thread_exception):主要的函数,作用是创建具有相同的执行函数但参数不同的一系列工作请求。

最后看一个例子,将上一节多线程下载的例子改为用线程池来实现。

import urllib2
import os
import time 
import threadpool 
def download_file(url):
     print "begin download",url
     urlhandler = urllib2.urlopen(url)
     fname = os.path.basename(url)+".html"
     with open(fname, "wb") as f:
         while True:
              chunk = urlhandler.read(1024)
              if not chunk: break
              f.write(chunk)
urls = ["http://wiki.python.org/moin/WebProgramming",
         "https://www.createspace.com/3611970",
         "http://wiki.python.org/moin/Documentation"
]
pool_size = 2
pool = threadpool.ThreadPool(pool_size)         #
创建线程池,大小为2
requests = threadpool.makeRequests(download_file, urls) #
创建工作请求
[pool.putRequest(req) for req in requests] 
print "putting request to pool"
pool.putRequest(threadpool.WorkRequest(download_file,args=["http://chrisarndt.
  de/projects/threadpool/api/",]))          #
将具体的请求放入线程池
pool.putRequest(threadpool.WorkRequest(download_file,args=["https://pypi.python.
  org/pypi/threadpool",]))
pool.poll()                       #
处理任务队列中的新的请求
pool.wait() 
print "destory all threads before exist"
pool.dismissWorkers(pool_size, do_join=True)      #
完成后退出

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文