- Preface 前言
- 第1章 引论
- 第2章 编程惯用法
- 第3章 基础语法
- 建议19:有节制地使用 from…import 语句
- 建议20:优先使用 absolute import 来导入模块
- 建议21:i+=1 不等于 ++i
- 建议22:使用 with 自动关闭资源
- 建议23:使用 else 子句简化循环(异常处理)
- 建议24:遵循异常处理的几点基本原则
- 建议25:避免 finally 中可能发生的陷阱
- 建议26:深入理解 None 正确判断对象是否为空
- 建议27:连接字符串应优先使用 join 而不是 +
- 建议28:格式化字符串时尽量使用 .format 方式而不是 %
- 建议29:区别对待可变对象和不可变对象
- 建议30:[]、() 和 {}:一致的容器初始化形式
- 建议31:记住函数传参既不是传值也不是传引用
- 建议32:警惕默认参数潜在的问题
- 建议33:慎用变长参数
- 建议34:深入理解 str() 和 repr() 的区别
- 建议35:分清 staticmethod 和 classmethod 的适用场景
- 第4章 库
- 建议36:掌握字符串的基本用法
- 建议37:按需选择 sort() 或者 sorted()
- 建议38:使用 copy 模块深拷贝对象
- 建议39:使用 Counter 进行计数统计
- 建议40:深入掌握 ConfigParser
- 建议41:使用 argparse 处理命令行参数
- 建议42:使用 pandas 处理大型 CSV 文件
- 建议43:一般情况使用 ElementTree 解析 XML
- 建议44:理解模块 pickle 优劣
- 建议45:序列化的另一个不错的选择 JSON
- 建议46:使用 traceback 获取栈信息
- 建议47:使用 logging 记录日志信息
- 建议48:使用 threading 模块编写多线程程序
- 建议49:使用 Queue 使多线程编程更安全
- 第5章 设计模式
- 第6章 内部机制
- 建议54:理解 built-in objects
- 建议55:init() 不是构造方法
- 建议56:理解名字查找机制
- 建议57:为什么需要 self 参数
- 建议58:理解 MRO 与多继承
- 建议59:理解描述符机制
- 建议60:区别 getattr() 和 getattribute() 方法
- 建议61:使用更为安全的 property
- 建议62:掌握 metaclass
- 建议63:熟悉 Python 对象协议
- 建议64:利用操作符重载实现中缀语法
- 建议65:熟悉 Python 的迭代器协议
- 建议66:熟悉 Python 的生成器
- 建议67:基于生成器的协程及 greenlet
- 建议68:理解 GIL 的局限性
- 建议69:对象的管理与垃圾回收
- 第7章 使用工具辅助项目开发
- 第8章 性能剖析与优化
建议89:使用线程池提高效率
我们知道线程的生命周期分为5个状态:创建、就绪、运行、阻塞和终止。自线程创建到终止,线程便不断在运行、就绪和阻塞这3个状态之间转换直至销毁。而真正占有CPU的只有运行、创建和销毁这3个状态。一个线程的运行时间由此可以分为3部分:线程的启动时间(Ts)、线程体的运行时间(Tr)以及线程的销毁时间(Td)。在多线程处理的情景中,如果线程不能够被重用,就意味着每次创建都需要经过启动、销毁和运行这3个过程。这必然会增加系统的相应时间,降低效率。而线程体的运行时间Tr不可控制,在这种情况下如何提高线程运行的效率呢?线程池便是一个解决方案。
线程池的基本原理如图8-6所示,它通过将事先创建多个能够执行任务的线程放入池中,所需要执行的任务通常被安排在队列中。通常情况下,需要处理的任务比线程数目要多,线程执行完当前任务后,会从队列中取下一个任务,直到所有的任务已经完成。
图8-6 线程池的基本原理
由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,带来更好的性能和系统稳定性。线程池技术适合处理突发性大量请求或者需要大量线程来完成任务、但任务实际处理时间较短的应用场景,它能有效避免由于系统中创建线程过多而导致的系统性能负荷过大、响应过慢等问题。
在Python中利用线程池有两种解决方案:一是自己实现线程池模式,二是使用线程池模块。我们先来看一个线程池模式的简单实现。
import Queue,sys,threading import urllib2,os # 处理request 的工作线程 class Worker(threading.Thread): def __init__( self, workQueue, resultQueue, **kwds): threading.Thread.__init__( self, **kwds ) self.setDaemon( True ) self.workQueue = workQueue self.resultQueue = resultQueue def run( self ): while True: try: callable, args, kwds = self.workQueue.get(False)# 从队列中取出一个任务 res = callable(*args, **kwds) self.resultQueue.put( res ) # 存放处理结果到队列中 except Queue.Empty: break class WorkerManager: # 线程池管理器 def __init__( self, num_of_workers=10): self.workQueue = Queue.Queue() # 请求队列 self.resultQueue = Queue.Queue() # 输出结果的队列 self.workers = [] self._recruitThreads( num_of_workers ) def _recruitThreads( self, num_of_workers ): for i in range( num_of_workers ): worker = Worker( self.workQueue, self.resultQueue )# 创建工作线程 self.workers.append(worker) # 加入线程队列中 def start(self): # 启动线程 for w in self.workers: w.start() def wait_for_complete( self): while len(self.workers): worker = self.workers.pop() # 从池中取出一个线程处理请求 worker.join( ) if worker.isAlive() and not self.workQueue.empty(): self.workers.append( worker ) # 重新加入线程池中 print "All jobs were completed." def add_job( self, callable, *args, **kwds ): self.workQueue.put( (callable, args, kwds) ) # 往工作队列中加入请求 def get_result( self, *args, **kwds ): # 获取处理结果 return self.resultQueue.get( *args, **kwds ) def download_file(url): print "begin download",url urlhandler = urllib2.urlopen(url) fname = os.path.basename(url)+".html" with open(fname, "wb") as f: while True: chunk = urlhandler.read(1024) if not chunk: break f.write(chunk) urls = ["http://wiki.python.org/moin/WebProgramming", "https://www.createspace.com/3611970", "http://wiki.python.org/moin/Documentation" ] wm = WorkerManager(2) # 创建线程池 for i in urls: wm.add_job( download_file, i ) # 将所有请求加入队列中 wm.start() wm.wait_for_complete()
自行实现线程池,需要定义一个Worker处理工作请求,定义WorkerManage来进行线程池的管理和创建,它包含一个工作请求列队和执行结果列队,具体的下载工作通过download_file()方法来实现。
相比自己实现的线程池模型,使用现成的线程池模块往往更简单。Python中线程池模块的下载地址为:https://pypi.python.org/pypi/threadpool。该模块提供了以下基本类和方法。
1)threadpool.ThreadPool:线程池类,主要的作用是用来分派任务请求和收集运行结果。主要有以下方法。
__init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):建立线程池,并启动对应 num_workers 的线程;q_size表示任务请求队列的大小,resq_size表示存放运行结果队列的大小。
createWorkers(self, num_workers, poll_timeout=5):将num_workers数量对应的线程加入线程池中。
dismissWorkers(self, num_workers, do_join=False):告诉num_workers数量的工作线程在执行完当前任务后退出。
joinAllDismissedWorkers(self):在设置为退出的线程上执行Thread.join。
putRequest(self, request, block=True, timeout=None):将工作请求放入队列中。
poll(self, block=False):处理任务队列中新的请求。
wait(self):阻塞用于等待所有执行结果。注意当所有执行结果返回后,线程池内部的线程并没有销毁,而在等待新的任务。因此,wait()之后仍然可以再次调用pool.putRequests()往其中添加任务。
2)threadpool.WorkRequest:包含有具体执行方法的工作请求类。
3)threadpool.WorkerThread:处理任务的工作线程,主要有run()方法以及dismiss()方法。
4)makeRequests(callable_,args_list,callback=None,exc_callback=_handle_thread_exception):主要的函数,作用是创建具有相同的执行函数但参数不同的一系列工作请求。
最后看一个例子,将上一节多线程下载的例子改为用线程池来实现。
import urllib2 import os import time import threadpool def download_file(url): print "begin download",url urlhandler = urllib2.urlopen(url) fname = os.path.basename(url)+".html" with open(fname, "wb") as f: while True: chunk = urlhandler.read(1024) if not chunk: break f.write(chunk) urls = ["http://wiki.python.org/moin/WebProgramming", "https://www.createspace.com/3611970", "http://wiki.python.org/moin/Documentation" ] pool_size = 2 pool = threadpool.ThreadPool(pool_size) # 创建线程池,大小为2 requests = threadpool.makeRequests(download_file, urls) # 创建工作请求 [pool.putRequest(req) for req in requests] print "putting request to pool" pool.putRequest(threadpool.WorkRequest(download_file,args=["http://chrisarndt. de/projects/threadpool/api/",])) # 将具体的请求放入线程池 pool.putRequest(threadpool.WorkRequest(download_file,args=["https://pypi.python. org/pypi/threadpool",])) pool.poll() # 处理任务队列中的新的请求 pool.wait() print "destory all threads before exist" pool.dismissWorkers(pool_size, do_join=True) # 完成后退出
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论