- Preface 前言
- 第1章 引论
- 第2章 编程惯用法
- 第3章 基础语法
- 建议19:有节制地使用 from…import 语句
- 建议20:优先使用 absolute import 来导入模块
- 建议21:i+=1 不等于 ++i
- 建议22:使用 with 自动关闭资源
- 建议23:使用 else 子句简化循环(异常处理)
- 建议24:遵循异常处理的几点基本原则
- 建议25:避免 finally 中可能发生的陷阱
- 建议26:深入理解 None 正确判断对象是否为空
- 建议27:连接字符串应优先使用 join 而不是 +
- 建议28:格式化字符串时尽量使用 .format 方式而不是 %
- 建议29:区别对待可变对象和不可变对象
- 建议30:[]、() 和 {}:一致的容器初始化形式
- 建议31:记住函数传参既不是传值也不是传引用
- 建议32:警惕默认参数潜在的问题
- 建议33:慎用变长参数
- 建议34:深入理解 str() 和 repr() 的区别
- 建议35:分清 staticmethod 和 classmethod 的适用场景
- 第4章 库
- 建议36:掌握字符串的基本用法
- 建议37:按需选择 sort() 或者 sorted()
- 建议38:使用 copy 模块深拷贝对象
- 建议39:使用 Counter 进行计数统计
- 建议40:深入掌握 ConfigParser
- 建议41:使用 argparse 处理命令行参数
- 建议42:使用 pandas 处理大型 CSV 文件
- 建议43:一般情况使用 ElementTree 解析 XML
- 建议44:理解模块 pickle 优劣
- 建议45:序列化的另一个不错的选择 JSON
- 建议46:使用 traceback 获取栈信息
- 建议47:使用 logging 记录日志信息
- 建议48:使用 threading 模块编写多线程程序
- 建议49:使用 Queue 使多线程编程更安全
- 第5章 设计模式
- 第6章 内部机制
- 建议54:理解 built-in objects
- 建议55:init() 不是构造方法
- 建议56:理解名字查找机制
- 建议57:为什么需要 self 参数
- 建议58:理解 MRO 与多继承
- 建议59:理解描述符机制
- 建议60:区别 getattr() 和 getattribute() 方法
- 建议61:使用更为安全的 property
- 建议62:掌握 metaclass
- 建议63:熟悉 Python 对象协议
- 建议64:利用操作符重载实现中缀语法
- 建议65:熟悉 Python 的迭代器协议
- 建议66:熟悉 Python 的生成器
- 建议67:基于生成器的协程及 greenlet
- 建议68:理解 GIL 的局限性
- 建议69:对象的管理与垃圾回收
- 第7章 使用工具辅助项目开发
- 第8章 性能剖析与优化
建议88:使用 multiprocessing 克服 GIL 的缺陷
众所周知,GIL的存在使得Python中的多线程无法充分利用多核的优势来提高性能,这个问题困扰着很多开发者,也使很多人备受打击。一些人甚至提出质疑要求移去GIL,但由于种种原因目前还没有明确的迹象表明GIL会在随后的版本中消失。为了能够充分利用多核优势,Python的专家们提供了另外一个解决方案:多进程。Multiprocessing由此而生,它是Python中的多进程管理包,在Python2.6版本中引进的,主要用来帮助处理进程的创建以及它们之间的通信和相互协调。它主要解决了两个问题:一是尽量缩小平台之间的差异,提供高层次的API从而使得使用者忽略底层IPC的问题;二是提供对复杂对象的共享支持,支持本地和远程并发。
类Process是multiprocessing中较为重要的一个类,用于创建进程,其构造函数如下:
Process([group [, target [, name [, args [, kwargs]]]]])
其中,参数target表示可调用对象;args表示调用对象的位置参数元组;kwargs表示调用对象的字典;name为进程的名称;group一般设置为None。该类提供的方法与属性基本上与threading.Thread类一致,包括is_alive()、join([timeout])、run()、start()、terminate()、daemon(要通过start()设置)、exitcode、name、pid等。
不同于线程,每个进程都有其独立的地址空间,进程间的数据空间也相互独立,因此进程之间数据的共享和传递不如线程来得方便。庆幸的是multiprocessing模块中都提供了相应的机制:如进程间同步操作原语Lock、Event、Condition、Semaphore,传统的管道通信机制pipe以及队列Queue,用于共享资源的multiprocessing.Value和multiprocessing.Array以及Manager等。
Multiprocessing模块在使用上需要注意以下几个要点:
1)进程之间的通信优先考虑Pipe和Queue,而不是Lock、Event、Condition、Semaphore等同步原语。进程中的类Queue使用pipe和一些locks、semaphores原语来实现,是进程安全的。该类的构造函数返回一个进程的共享队列,其支持的方法和线程中的Queue基本类似,除了方法task_done()和join()是在其子类JoinableQueue中实现的以外。需要注意的是,由于底层使用pipe来实现,使用Queue进行进程之间的通信的时候,传输的对象必须是可以序列化的,否则put操作会导致PicklingError。此外,为了提供put方法的超时控制,Queue并不是直接将对象写到管道中而是先写到一个本地的缓存中,再将其从缓存中放入pipe中,内部有个专门的线程feeder负责这项工作。由于feeder的存在,Queue还提供了以下特殊方法来处理进程退出时缓存中仍然存在数据的问题。
close():表明不再存放数据到queue中。一旦所有缓冲的数据刷新到管道,后台线程将退出。
join_thread():一般在close方法之后使用,它会阻止直到的后台线程退出,确保所有缓冲区中的数据已经刷新到管道中。
cancel_join_thread():需要立即退出当前进程,而无需等待排队的数据刷新到底层的管道的时候可以使用该方法,表明无需阻止到后台线程的退出。
Multiprocessing中还有个SimpleQueue队列,它是实现了锁机制的pipe,内部去掉了buffer,但没有提供put和get的超时处理,两个动作都是阻塞的。
除了multiprocessing.Queue之外,另一种很重要的通信方式是multiprocessing.Pipe。它的构造函数为multiprocessing.Pipe([duplex]),其中duplex默认为True,表示为双向管道,否则为单向。它返回一个Connection对象的组(conn1,conn2),分别代表管道的两端。Pipe不支持进程安全,因此当有多个进程同时对管道的一端进行读操作或者写操作的时候可能会导致数据丢失或者损坏。因此在进程通信的时候,如果是超过2个以上进程,可以使用queue,但对于两个进程之间的通信而言Pipe性能更快。下面看一个示例:
from multiprocessing import Process, Pipe,Queue import time def reader_pipe(pipe): output_p, input_p = pipe # 返回管道的两端 input_p.close() while True: try: msg = output_p.recv() # 从pipe 中读取消息 except EOFError: break def writer_pipe(count, input_p): # 写消息到管道中 for i in xrange(0, count): input_p.send(i) # 发送消息 def reader_queue(queue): # 利用队列来发送消息 while True: msg = queue.get() # 从队列中获取元素 if (msg == 'DONE'): break def writer_queue(count, queue): for ii in xrange(0, count): queue.put(ii) # 放入消息队列中 queue.put('DONE') if __name__=='__main__': print "testing for pipe:" for count in [10**3, 10**4, 10**5]: output_p, input_p = Pipe() reader_p = Process(target=reader_pipe, args=((output_p, input_p),)) reader_p.start() # 启动进程 output_p.close() _start = time.time() writer_pipe(count, input_p) # 写消息到管道中 input_p.close() reader_p.join() # 等待进程处理完毕 print "Sending %s numbers to Pipe() took %s seconds" % (count, (time.time() - _start)) print "testing for queue:" for count in [10**3, 10**4, 10**5]: queue = Queue() # 利用queue 进行通信 reader_p = Process(target=reader_queue, args=((queue),)) reader_p.daemon = True reader_p.start() _start = time.time() writer_queue(count, queue) # 写消息到queue 中 reader_p.join() print "Sending %s numbers to Queue() took %s seconds" % (count, (time.time() - _start)) 输出比较: testing for pipe: Sending 1000 numbers to Pipe() took 0.15299987793 seconds Sending 10000 numbers to Pipe() took 0.384999990463 seconds Sending 100000 numbers to Pipe() took 2.09099984169 seconds testing for queue: Sending 1000 numbers to Queue() took 0.169000148773 seconds Sending 10000 numbers to Queue() took 0.555000066757 seconds Sending 100000 numbers to Queue() took 3.0790002346 seconds
上面的代码分别用来测试两个多线程的情况下使用pipe和queue进行通信发送相同数据的时候的性能,其中与pipe相关的函数为reader_pipe()和writer_pipe(),而与queue相关的主要函数为writer_queue()和reader_queue()。从函数输出可以看出,pipe所消耗的时间较小,性能更好。
2)尽量避免资源共享。相比于线程,进程之间资源共享的开销较大,因此要尽量避免资源共享。但如果不可避免,可以通过multiprocessing.Value和multiprocessing.Array或者multiprocessing.sharedctypes来实现内存共享,也可以通过服务器进程管理器Manager()来实现数据和状态的共享。这两种方式各有优势,总体来说共享内存的方式更快,效率更高,但服务器进程管理器Manager()使用起来更加方便,并且支持本地和远程内存共享。我们通过几个例子来看一下各自使用需要注意的问题。
示例一:使用Value进行内存共享。
import time from multiprocessing import Process, Value def func(val): # 多个进程同时修改val for i in range(10): time.sleep(0.1) val.value += 1 if __name__ == '__main__': v = Value('i', 0) # 使用value 来共享内存 processList = [Process(target=func, args=(v,)) for i in range(10)] for p in processList: p.start() for p in processList: p.join() print v.value
上面的程序输出是多少?100对吗?你可以运行看看。Python官方文档中有个容易让人迷惑的描述:在Value的构造函数multiprocessing.Value(typecode_or_type,*args[,lock])中,如果lock的值为True会创建一个锁对象用于同步访问控制,该值默认为True。因此很多人会以为Value是进程安全的,实际上要真正控制同步访问,需要实现获取这个锁。因此上面的例子要保证每次运行都输出100,需要将函数func修改如下:
def func(val): for i in range(10): time.sleep(0.1) with val.get_lock(): # 仍然需要使用get_lock 方法来获取锁对象 val.value += 1
示例二:使用Manager进行内存共享。
import multiprocessing def f(ns): ns.x.append(1) ns.y.append('a') if __name__ == '__main__': manager = multiprocessing.Manager() ns = manager.Namespace() ns.x = [] #manager 内部包括可变对象 ns.y = [] print 'before process operation:', ns p = multiprocessing.Process(target=f, args=(ns,)) p.start() p.join() print 'after process operation', ns # 修改根本不会生效
本意是希望x=[1],y=[‘a’],程序输出是不是期望的结果呢?答案是否定的。这又是为什么呢?这是因为manager对象仅能传播对一个可变对象本身所做的修改,如有一个manager.list()对象,管理列表本身的任何更改会传播到所有其他进程。但是,如果容器对象内部还包括可修改的对象,则内部可修改对象的任何更改都不会传播到其他进程。因此,正确的处理方式应该是下面这种形式:
import multiprocessing def f(ns,x,y): x.append(1) y.append('a') ns.x= x # 将可变对象也作为参数传入 ns.y = y if __name__ == '__main__': manager = multiprocessing.Manager() ns = manager.Namespace() ns.x = [] ns.y = [] print 'before process operation:', ns p = multiprocessing.Process(target=f, args=(ns,ns.x,ns.y,)) p.start() p.join() print 'after process operation', ns
3)注意平台之间的差异。由于Linux平台使用fork()函数来创建进程,因此父进程中所有的资源,如数据结构、打开的文件或者数据库的连接都会在子进程中共享,而Windows平台中父子进程相对独立,因此为了更好地保持平台的兼容性,最好能够将相关资源对象作为子进程的构造函数的参数传递进去。因此要避免如下方式:
f = None def child(f): # do something if __name__ == '__main__': f = open(filename, mode) p = Process(target=child) p.start() p.join() 而推荐使用如下方式: def child(f): print f if __name__ == '__main__': f = open(filename,mode) p = Process(target=child,args=(f,)) # 将资源对象作为构造函数参数传入 p.start() p.join()
需要注意的是,Linux平台上multiprocessing的实现是基于C库中的fork(),所有子进程与父进程的数据是完全相同,因此父进程中所有的资源,如数据结构、打开的文件或者数据库的连接都会在子进程共享。但Windows平台上由于没有fork()函数,父子进程相对独立,因此为了保持平台的兼容性,最好在脚本中加上“if__name__==“__main__”:”的判断。这样可以避免有可能出现的RuntimeError或者死锁。
4)尽量避免使用terminate()方式终止进程,并且确保pool.map中传入的参数是可以序列化的。在下面的例子中,如果直接将一个方法作为参数传入map中,会抛出cPickle.PicklingError异常,这是因为函数和方法是不可序列化的。
def run(self): def f(x): return x*x p = Pool() return p.map(f, [1,2,3]) # 直接传入函数f cl = calculate() print cl.run() # 抛出cPickle.PicklingError 异常
一个可行的正确做法如下:
import multiprocessing def unwrap_self_f(arg, **kwarg): return calculate.f(*arg, **kwarg) # 返回一个对象 class calculate(object): def f(self,x): return x*x def run(self): p = multiprocessing.Pool() return p.map(unwrap_self_f, zip([self]*3,[1,2,3])) if __name__ == "__main__": cl = calculate() print cl.run()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论