返回介绍

建议88:使用 multiprocessing 克服 GIL 的缺陷

发布于 2024-01-30 22:19:09 字数 7840 浏览 0 评论 0 收藏 0

众所周知,GIL的存在使得Python中的多线程无法充分利用多核的优势来提高性能,这个问题困扰着很多开发者,也使很多人备受打击。一些人甚至提出质疑要求移去GIL,但由于种种原因目前还没有明确的迹象表明GIL会在随后的版本中消失。为了能够充分利用多核优势,Python的专家们提供了另外一个解决方案:多进程。Multiprocessing由此而生,它是Python中的多进程管理包,在Python2.6版本中引进的,主要用来帮助处理进程的创建以及它们之间的通信和相互协调。它主要解决了两个问题:一是尽量缩小平台之间的差异,提供高层次的API从而使得使用者忽略底层IPC的问题;二是提供对复杂对象的共享支持,支持本地和远程并发。

类Process是multiprocessing中较为重要的一个类,用于创建进程,其构造函数如下:

Process([group [, target [, name [, args [, kwargs]]]]])

其中,参数target表示可调用对象;args表示调用对象的位置参数元组;kwargs表示调用对象的字典;name为进程的名称;group一般设置为None。该类提供的方法与属性基本上与threading.Thread类一致,包括is_alive()、join([timeout])、run()、start()、terminate()、daemon(要通过start()设置)、exitcode、name、pid等。

不同于线程,每个进程都有其独立的地址空间,进程间的数据空间也相互独立,因此进程之间数据的共享和传递不如线程来得方便。庆幸的是multiprocessing模块中都提供了相应的机制:如进程间同步操作原语Lock、Event、Condition、Semaphore,传统的管道通信机制pipe以及队列Queue,用于共享资源的multiprocessing.Value和multiprocessing.Array以及Manager等。

Multiprocessing模块在使用上需要注意以下几个要点:

1)进程之间的通信优先考虑Pipe和Queue,而不是Lock、Event、Condition、Semaphore等同步原语。进程中的类Queue使用pipe和一些locks、semaphores原语来实现,是进程安全的。该类的构造函数返回一个进程的共享队列,其支持的方法和线程中的Queue基本类似,除了方法task_done()和join()是在其子类JoinableQueue中实现的以外。需要注意的是,由于底层使用pipe来实现,使用Queue进行进程之间的通信的时候,传输的对象必须是可以序列化的,否则put操作会导致PicklingError。此外,为了提供put方法的超时控制,Queue并不是直接将对象写到管道中而是先写到一个本地的缓存中,再将其从缓存中放入pipe中,内部有个专门的线程feeder负责这项工作。由于feeder的存在,Queue还提供了以下特殊方法来处理进程退出时缓存中仍然存在数据的问题。

close():表明不再存放数据到queue中。一旦所有缓冲的数据刷新到管道,后台线程将退出。

join_thread():一般在close方法之后使用,它会阻止直到的后台线程退出,确保所有缓冲区中的数据已经刷新到管道中。

cancel_join_thread():需要立即退出当前进程,而无需等待排队的数据刷新到底层的管道的时候可以使用该方法,表明无需阻止到后台线程的退出。

Multiprocessing中还有个SimpleQueue队列,它是实现了锁机制的pipe,内部去掉了buffer,但没有提供put和get的超时处理,两个动作都是阻塞的。

除了multiprocessing.Queue之外,另一种很重要的通信方式是multiprocessing.Pipe。它的构造函数为multiprocessing.Pipe([duplex]),其中duplex默认为True,表示为双向管道,否则为单向。它返回一个Connection对象的组(conn1,conn2),分别代表管道的两端。Pipe不支持进程安全,因此当有多个进程同时对管道的一端进行读操作或者写操作的时候可能会导致数据丢失或者损坏。因此在进程通信的时候,如果是超过2个以上进程,可以使用queue,但对于两个进程之间的通信而言Pipe性能更快。下面看一个示例:

from multiprocessing import Process, Pipe,Queue
import time
def reader_pipe(pipe):
     output_p, input_p = pipe             #
返回管道的两端
     input_p.close()   
     while True:
         try:
               msg = output_p.recv()    #
从pipe
中读取消息
         except EOFError:
               break
def writer_pipe(count, input_p):            #
写消息到管道中
     for i in xrange(0, count):
         input_p.send(i)            #
发送消息
def reader_queue(queue):                #
利用队列来发送消息
     while True:
         msg = queue.get()            #
从队列中获取元素
         if (msg == 'DONE'):
               break
def writer_queue(count, queue):
     for ii in xrange(0, count):
         queue.put(ii)              #
放入消息队列中
     queue.put('DONE')
if __name__=='__main__':
     print "testing for pipe:"
     for count in [10**3, 10**4, 10**5]:
         output_p, input_p = Pipe()
         reader_p = Process(target=reader_pipe, args=((output_p, input_p),))
         reader_p.start()             #
启动进程
         output_p.close()   
         _start = time.time()
         writer_pipe(count, input_p)      #
写消息到管道中
         input_p.close()
         reader_p.join()            #
等待进程处理完毕
         print "Sending %s numbers to Pipe() took %s seconds" % (count,
           (time.time() - _start))
     print "testing for queue:"
     for count in [10**3, 10**4, 10**5]:
         queue = Queue()            #
利用queue
进行通信
         reader_p = Process(target=reader_queue, args=((queue),))
         reader_p.daemon = True
         reader_p.start()   
         _start = time.time()
         writer_queue(count, queue)       #
写消息到queue
中
         reader_p.join()    
         print "Sending %s numbers to Queue() took %s seconds" % (count,
           (time.time() - _start))
输出比较:
testing for pipe:
Sending 1000 numbers to Pipe() took 0.15299987793 seconds
Sending 10000 numbers to Pipe() took 0.384999990463 seconds
Sending 100000 numbers to Pipe() took 2.09099984169 seconds
testing for queue:
Sending 1000 numbers to Queue() took 0.169000148773 seconds
Sending 10000 numbers to Queue() took 0.555000066757 seconds
Sending 100000 numbers to Queue() took 3.0790002346 seconds

上面的代码分别用来测试两个多线程的情况下使用pipe和queue进行通信发送相同数据的时候的性能,其中与pipe相关的函数为reader_pipe()和writer_pipe(),而与queue相关的主要函数为writer_queue()和reader_queue()。从函数输出可以看出,pipe所消耗的时间较小,性能更好。

2)尽量避免资源共享。相比于线程,进程之间资源共享的开销较大,因此要尽量避免资源共享。但如果不可避免,可以通过multiprocessing.Value和multiprocessing.Array或者multiprocessing.sharedctypes来实现内存共享,也可以通过服务器进程管理器Manager()来实现数据和状态的共享。这两种方式各有优势,总体来说共享内存的方式更快,效率更高,但服务器进程管理器Manager()使用起来更加方便,并且支持本地和远程内存共享。我们通过几个例子来看一下各自使用需要注意的问题。

示例一:使用Value进行内存共享。

import time
from multiprocessing import Process, Value
def func(val):                  #
多个进程同时修改val
  for i in range(10):
    time.sleep(0.1)
    val.value += 1
if __name__ == '__main__':
  v = Value('i', 0)               #
使用value
来共享内存
  processList = [Process(target=func, args=(v,)) for i in range(10)]
  for p in processList: p.start()
  for p in processList: p.join()
print v.value

上面的程序输出是多少?100对吗?你可以运行看看。Python官方文档中有个容易让人迷惑的描述:在Value的构造函数multiprocessing.Value(typecode_or_type,*args[,lock])中,如果lock的值为True会创建一个锁对象用于同步访问控制,该值默认为True。因此很多人会以为Value是进程安全的,实际上要真正控制同步访问,需要实现获取这个锁。因此上面的例子要保证每次运行都输出100,需要将函数func修改如下:

def func(val):
  for i in range(10):
    time.sleep(0.1)
       with val.get_lock():     #
仍然需要使用get_lock
方法来获取锁对象
           val.value += 1

示例二:使用Manager进行内存共享。

import multiprocessing 
def f(ns):
    ns.x.append(1)
    ns.y.append('a')
if __name__ == '__main__':
    manager = multiprocessing.Manager()
    ns = manager.Namespace()
    ns.x = []                 #manager
内部包括可变对象
    ns.y = []
    print 'before process operation:', ns
    p = multiprocessing.Process(target=f, args=(ns,))
    p.start()
    p.join()
    print 'after process operation', ns     #
修改根本不会生效

本意是希望x=[1],y=[‘a’],程序输出是不是期望的结果呢?答案是否定的。这又是为什么呢?这是因为manager对象仅能传播对一个可变对象本身所做的修改,如有一个manager.list()对象,管理列表本身的任何更改会传播到所有其他进程。但是,如果容器对象内部还包括可修改的对象,则内部可修改对象的任何更改都不会传播到其他进程。因此,正确的处理方式应该是下面这种形式:

import multiprocessing 
def f(ns,x,y):
  x.append(1)
  y.append('a')
  ns.x= x                       #
将可变对象也作为参数传入
  ns.y = y
if __name__ == '__main__':
  manager = multiprocessing.Manager()
  ns = manager.Namespace()
  ns.x = []
  ns.y = []
  print 'before process operation:', ns
  p = multiprocessing.Process(target=f, args=(ns,ns.x,ns.y,))
  p.start()
  p.join()
  print 'after process operation', ns

3)注意平台之间的差异。由于Linux平台使用fork()函数来创建进程,因此父进程中所有的资源,如数据结构、打开的文件或者数据库的连接都会在子进程中共享,而Windows平台中父子进程相对独立,因此为了更好地保持平台的兼容性,最好能够将相关资源对象作为子进程的构造函数的参数传递进去。因此要避免如下方式:

f = None
def child(f):
  # do something
if __name__ == '__main__':
  f = open(filename, mode)
  p = Process(target=child)
  p.start()
p.join()
而推荐使用如下方式:
def child(f):
  print f
if __name__ == '__main__':
  f = open(filename,mode)
  p = Process(target=child,args=(f,))     #
将资源对象作为构造函数参数传入
  p.start()
  p.join()

需要注意的是,Linux平台上multiprocessing的实现是基于C库中的fork(),所有子进程与父进程的数据是完全相同,因此父进程中所有的资源,如数据结构、打开的文件或者数据库的连接都会在子进程共享。但Windows平台上由于没有fork()函数,父子进程相对独立,因此为了保持平台的兼容性,最好在脚本中加上“if__name__==“__main__”:”的判断。这样可以避免有可能出现的RuntimeError或者死锁。

4)尽量避免使用terminate()方式终止进程,并且确保pool.map中传入的参数是可以序列化的。在下面的例子中,如果直接将一个方法作为参数传入map中,会抛出cPickle.PicklingError异常,这是因为函数和方法是不可序列化的。

  def run(self):
      def f(x):
      return x*x
    p = Pool() 
    return p.map(f, [1,2,3])        #
直接传入函数f
cl = calculate()
print cl.run()                  #
抛出cPickle.PicklingError
异常

一个可行的正确做法如下:

import multiprocessing
def unwrap_self_f(arg, **kwarg):
  return calculate.f(*arg, **kwarg)       #
返回一个对象
class calculate(object):
  def f(self,x):
        return x*x
  def run(self):
        p = multiprocessing.Pool()
        return p.map(unwrap_self_f, zip([self]*3,[1,2,3]))
if __name__ == "__main__":
  cl = calculate()
  print cl.run()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文