- 内容提要
- 前言
- 作者简介
- 封面简介
- 第1章 理解高性能 Python
- 第2章 通过性能分析找到瓶颈
- 2.1 高效地分析性能
- 2.2 Julia 集合的介绍
- 2.3 计算完整的 Julia 集合
- 2.4 计时的简单方法——打印和修饰
- 2.5 用 UNIX 的 time 命令进行简单的计时
- 2.6 使用 cProfile 模块
- 2.7 用 runsnakerun 对 cProfile 的输出进行可视化
- 2.8 用 line_profiler 进行逐行分析
- 2.9 用 memory_profiler 诊断内存的用量
- 2.10 用 heapy 调查堆上的对象
- 2.11 用 dowser 实时画出变量的实例
- 2.12 用 dis 模块检查 CPython 字节码
- 2.13 在优化期间进行单元测试保持代码的正确性
- 2.14 确保性能分析成功的策略
- 2.15 小结
- 第3章 列表和元组
- 第4章 字典和集合
- 第5章 迭代器和生成器
- 第6章 矩阵和矢量计算
- 第7章 编译成 C
- 第8章 并发
- 第9章 multiprocessing 模块
- 第10章 集群和工作队列
- 第11章 使用更少的 RAM
- 第12章 现场教训
9.4 寻找素数
接下来,我们会查看在一个大数值范围内测试素数。这是一个与估算pi不同的问题,因为工作负载会变化,这取决于你在数值范围中的位置,并且每一个数字检查都具有不可预测的复杂度。我们可以创建一个串行的例程来检测素数,接着给每个进程传递可能的因子集来做检查。要并行化这个问题是令人为难的,这意味着没有需要被共享的状态才行。
Multiprocessing模块使控制工作负载变得容易,所以我们应该会调查该如何调制队列来使用(和误用!)计算资源,并且探索出一个简单的方法来稍稍更有效地来使用我们的资源。这意味着我们会看看负载平衡来设法有效地把可变复杂度的任务分配给我们的固定资源集。
如果我们有一个偶数的话,我们会使用一个稍加改进的来自本书前面章节的(请看第9页上的“理想计算模型Python虚拟机”)算法,请看例9-4。
例9-4 使用Python来寻找素数
def check_prime(n): if n % 2 == 0: return False from_i = 3 to_i = math.sqrt(n) + 1 for i in xrange(from_i, int(to_i), 2): if n % i == 0: return False return True
当用这种方式检测素数时,有多少工作负载的变化是我们看得到的?图9-10显示了当可能的素数n从10000增长到1000000时所增加的检测素数的时间开销。
大多数数字是非素数的,它们用一个点来描绘。有些可以花很少的代价检测到,然而另一些则需要检查很多因子。素数由一个x来描绘,并且形成了一个厚厚的深色带,它们是检测开销最大的。检测一个数字的时间开销随着n增长而增长,因为要检查的可能的因子的区间是以n的平方根来增长的。素数序列是不可预测的,所以我们无法决定一个数值区间的期望开销(我们可以估算它,但是不能确保它的负责度)。
对这张图表,我们对每个n测试了20次,并且采用了最快的结果来消除结果中的抖动。
图9-10 随着n增长,检测素数所需的时间
当我们给进程池分配工作时,我们可以指定要给每个工作者传递的工作量。我们可以均匀地划分所有工作并力求一次传递完,或者我们也可以创建很多工作块,当CPU空闲时就把它们传递出去。这是由chunksize参数来控制的。更大的工作块意味着更少的通信开销,而更小的工作块意味着对资源分配进行更多的控制。
对于我们的素数查寻器来说,一个单独的工作划片是一个由check_prime来检测的数字n。chunksize是10就意味着每一个进程处理一列10个整数,同时处理一列。
在图9-11中我们可以看到从1(每个任务是一个单独的工作划片)到64(每个任务是一列64个数字)之间变化的chunksize的效果。尽管有很多小任务给我们带来了最大的灵活性,它也强加了最大的通信开销。所有的4个CPU会有效地得以利用,但是当每一个任务和处理结果都经过一个单独的通信管道传输时,这个单独的信道就变成了一个瓶颈。如果我们把chunksize翻倍变成2,我们的任务就以两倍快的速度得到了解决,因为我们在通信管道上有更少的竞争。我们可能会天真地假设通过增加chunksize,我们会继续缩短执行时间。无论如何,就如你能在图表中所见的那样,我们将再次遇到一个回报减弱的点。
图9-11 选择一个合理的chunksize值
我们可以继续增大chunksize直到开始发现表现变坏。在图9-12中我们扩展了块尺寸的区间,使得它们不但仅仅有小块,而且还有巨块。在区间的大端,最坏的结果显示为1.31秒,在那里我们已让chunksize变成了50000——这意味着我们的100000项被划分成了两个工作块,使得两个CPU在整个扫描过程都空闲了下来。而使用具有10000项的chunksize,我们创建了10个工作块,这意味着4个工作块将并行地运行两遍,接下来再运行剩余的2个工作块。这就让两个CPU在第三轮工作中空闲下来,是对资源的低效利用。
在这种情况下一个优化的解决方案是把所有数量的任务根据CPU的数量来划分。这是multiprocessing的默认行为,在图中显示为“默认”的黑点。
作为一个通用规则,默认的行为是明智的,只有当你期望看见一个真正的收益,并且对比默认行为确切地去证实你的假设时,才去调整它。
与蒙特卡罗的pi问题不同,我们的素数检测计算有着可变的复杂度——有时一个任务快速地结束了(偶数检测得最快),而有时一个数字很大,而且是素数(这要花费长得多的时间去检测)。
图9-12 选择一个合理的chunksize值(继续)
如果我们随机化我们的工作序列,会发生什么呢?对于这个问题,我们压榨出了2%的性能收益,就如你在图9-13中所见的那样。通过随机化,我们减少了序列中的最后任务比其他任务要花费更久时间运行,从而只让一个CPU处于活动状态的可能性。
就如我们更早的例子使用一个10000的chunksize所演示的那样,错配工作负载和可利用的资源会导致低效。在那种情况下,我们创建了三轮工作:开始的两轮使用了100%的资源,而最后一轮仅使用了50%。
图9-14展示了当我们错配工作块数量和处理器数量时,奇怪的现象会发生。错配会导致对资源利用不足。当仅有一个工作块被创建时,发生了最慢的整体运行时间。两个工作块让两个CPU没有得到利用,依次类推,只有当我们有4个工作块时,我们才使用上了所有的资源。但是如果我们增加了第5个工作块,那么我们会再次对资源利用不足——4个CPU会工作于它们的块上,接着一个CPU将运行计算第5个块。
图9-13 随机化任务队列
当我们增大了工作块数量时,我们看到低效程度减弱了——29和32个工作块的运行时间差异大约是0.01秒。通用规则就是如果你的任务运行时是可变的,那就创建许多的小任务来有效使用资源。
有一些策略用来有效使用multiprocessing解决棘手的并行问题:
把你的工作拆分成独立的工作单元。
如果你的工作者所花的时间是可变的,那就考虑随机化工作序列(另一个例子就是处理大小可变的文件)。
对你的工作队列进行排序,这样首先处理最慢的任务可能是一个平均来说有用的策略。
使用默认的chunksize,除非你已经验证了调节它的理由。
让任务数量与物理CPU数量保持一致(默认的chunksize再次为你考虑到了,尽管它默认会使用超线程,这样可能不会提供额外的性能收益)。
图9-14 选择不合适的块数量的危险性
注意默认情况下,multiprocessing会把超线程视作附加的CPU。这意味着在Ian的笔记本电脑上,它会分配8个进程,而只有4个会真正跑出100%的速度。多出的4个进程可能占用了珍贵的RAM,却几乎没有提供任何额外的速度提升。
使用一个池,我们可以把一块预定义的工作事先在可用的CPU上拆分。然而如果我们有动态的工作负载,尤其是如果我们有随时间而来的工作负载,这样做帮助就减少了。对于这种类型的工作负载,我们可能想要使用一个Queue,在下一节会介绍。
备忘
如果你正工作于一个长期运行的科学问题,每个任务花费许多秒(或更长)来运行,那么你可能会想要检视下Cael Varoqaux的joblib。这个工具支持轻量级的流水线,它在multiprocessing之上设置,并且提供了一个更简单的并行接口、结果缓存和调试功能。
工作队列
multiprocessing.Queue对象给我们非持久性的队列,能够在进程间传送任何可序列化(pickleable)的Python对象。当每个对象必须要被序列化(pickle)来传送,接着在消费者那里复原(伴随着一些加锁操作)时,它们就带来了一个开销。在下面的例子中,我们会看到这个代价是不可忽略的。无论如何,如果你的工作者正在处理更大的任务,那么通信开销可能是可接受的。
使用队列来工作相当简单。在这个例子中,我们会检测素数,通过消费一列候选数字并且把确认的素数发回一个definite_primes_queue。我们会使用一个、两个、四个和八个进程来运行,并且证实后者都会比只运行一个单独的进程来检测相同的区间花费更长的时间。
Queue带给我们使用原生的Python对象来执行许多进程间通信的能力。如果你正在用许多状态在对象间相互传递,这可能是有用的。然而,因为Queue缺乏持久性,你可能不想用它们来做在面临失效时需要鲁棒性的工作(例如,如果你断电了或者硬盘崩坏了)。
例9-5展示了check_prime函数。我们已经熟悉了基本的素数测试方法。我们运行于一个无限循环中,阻塞于(等待直到有可用的任务为止)possible_primes_ queue.get()上来从队列中消费一项任务。只有一个进程能够在同一时刻得到一项任务,因为Queue对象考虑到了同步存取。如果队列中没有任务,那么.get()就阻塞直到任务可用。当素数被找到时,它们被放回definite_primes_queue中来为父进程所消费。
例9-5 使用两个队列来IPC(进程间通信)
FLAG_ALL_DONE = b"WORK_FINISHED" FLAG_WORKER_FINISHED_PROCESSING = b"WORKER_FINISHED_PROCESSING" def check_prime(possible_primes_queue, definite_primes_queue): while True: n = possible_primes_queue.get() if n == FLAG_ALL_DONE: # flag that our results have all been pushed to the results queue definite_primes_queue.put(FLAG_WORKER_FINISHED_PROCESSING) break else: if n % 2 == 0: continue for i in xrange(3, int(math.sqrt(n)) + 1, 2): if n % i == 0: break else: definite_primes_queue.put(n)
我们定义了两个标记:一个由父进程来表明没有可用的工作了,而第二个由工作者来确认它已经看到了毒药,并把自己关闭。第一个毒药也叫哨兵,因为它保证终结处理循环。
当处理工作队列和远程工作者时,使用像那样的标记有助于来记录毒药已送出,并且检查响应已在合理的时间窗口由子进程送出,从而表明它们正在关闭中。我们在这里不处理那个进程,但是增加一些时间记录对代码是一种相对简单的添加。这些标记的接收依据在调试期间能够被记入日志或者打印出来。
Queue对象创建于例9-6中的Manager。我们会使用熟悉的过程来构建一个Process对象列表,每一个包含了一个派生(fork)进程。两个队列被当作参数送出,multiprocessing处理它们的同步。已经启动新进程后,我们就把一个任务列表移交给possible_primes_queue,并且用毒药来终结每个进程。任务会以先进先出的顺序被消费,毒药留在最后面。在check_prime中,我们使用一个blocking.get(),因为新进程不得不等待工作在队列中出现。既然我们使用了标记,我们就可能会增加一些工作,处理结果,接着通过增加更多的工作来遍历,并且通过在之后增加毒药来表明工作者生命的终结。
例9-6 为IPC(进程间通信)构建两个队列
if __name__ == "__main__": primes = [] manager = multiprocessing.Manager() possible_primes_queue = manager.Queue() definite_primes_queue = manager.Queue() NBR_PROCESSES = 2 pool = Pool(processes=NBR_PROCESSES) processes = [] for _ in range(NBR_PROCESSES): p = multiprocessing.Process(target=check_prime, args=(possible_primes_queue, definite_primes_queue)) processes.append(p) p.start() t1 = time.time() number_range = xrange(100000000, 101000000) # add jobs to the inbound work queue for possible_prime in number_range: possible_primes_queue.put(possible_prime) # add poison pills to stop the remote workers for n in xrange(NBR_PROCESSES): possible_primes_queue.put(FLAG_ALL_DONE)
为了消费结果,我们在例9-7中启动了另一个无限循环,在definite_primes_ queue上使用一个blocking.get()。如果finished-processing标记找到了,那么我们就对表明自己终结退出的进程计数。如果没有找到,那么我们就有一个新素数并把它添加到素数列表中去。当我们所有的进程已经表明自己终结退出时,我们就结束无限循环。
例9-7 为IPC(进程间通信)使用两个队列
processors_indicating_they_have_finished = 0 while True: new_result = definite_primes_queue.get() # block while waiting for results if new_result == FLAG_WORKER_FINISHED_PROCESSING: processors_indicating_they_have_finished += 1 if processors_indicating_they_have_finished == NBR_PROCESSES: break else: primes.append(new_result) assert processors_indicating_they_have_finished == NBR_PROCESSES print "Took:", time.time() - t1 print len(primes), primes[:10], primes[-10:]
归因于序列化(pickle)和同步,使用Queue具有相当的开销。就如你在图9-15中所能看到的那样,使用一个更少的Queue的单进程解决方案明显要比使用两个或多个进程的要快。这种情况的原因就是我们的工作负载很轻——对于这个任务,通信开销占据了整体时间的大部分。使用Queues,两个进程完成这个例子要比一个进程稍快一点,而四个或八个进程则比一个进程要更慢。
如果你的任务有较长的完成时间(至少相当多的秒数)和少量的通信,那么Queue的方式可能是正确的答案。你将不得不验证通信开销是否让这种方式足够有效。
你可能想知道如果我们移除了多余的一半工作队列(所有的偶数——这些偶数在check_prime中被很快地剔除了)会发生什么。把输入队列减半在每一种情况下都让我们的执行时间减半了,但是它还是没有战胜单进程非队列的例子!这有助于演示通信开销在这个问题中占主导因素。
图9-15 使用Queue对象的开销
异步地给Queue添加工作
通过给主进程增加一个Thread,我们能够异步地给possible_primes_queue提供工作。在例9-8中,我们定义了一个feed_new_jobs函数:它就如我们在__main__之前的工作设置例程那样,执行相同的工作,但是它却是在一个独立的线程中做的。
例9-8 异步工作提供函数
def feed_new_jobs(number_range, possible_primes_queue, nbr_poison_pills): for possible_prime in number_range: possible_primes_queue.put(possible_prime) # add poison pills to stop the remote workers for n in xrange(nbr_poison_pills): possible_primes_queue.put(FLAG_ALL_DONE)
现在,在例9-9中,我们的__main__将使用possible_primes_queue来设置Thread,接着在任何工作被发起前,继续移动到结果收集阶段。异步工作提供者能够从外部源中消费工作(例如,从一个数据库或者I/O密集型的通信中),而__main__线程来操作每一个处理后的结果。这意味着输入序列和输出序列不需要提前被创建,它们都能够被即时处理。
例9-9 使用一个线程来设置一个异步工作提供者
if __name__ == "__main__": primes = [] manager = multiprocessing.Manager() possible_primes_queue = manager.Queue() ... import threading thrd = threading.Thread(target=feed_new_jobs, args=(number_range, possible_primes_queue, NBR_PROCESSES)) thrd.start() # deal with the results
如果你想要健壮的异步系统,你几乎一定要看看成熟的外部库。gevent、tornado和Twisted是强力的候选者,Python 3.4的tulip是一个新的竞争者。我们在这里看的例子将让你起步,但是实际上相比对生产系统的作用,它们对很简单的系统和培训来说更有用。
备忘
另一个你可能想要调查的单机队列是PyRes。这个模块使用了Redis(在9.5.5节介绍过)来存储队列的状态。Redis是一个非Python的数据存储系统,这意味着由Redis所持有的队列数据在Python之外是可读的,并且能够被非Python的系统所共享。
要非常注意,异步系统需要一个特殊级别的耐心——当你在调试时,你会在撕扯你的头发中结束。我们要建议:
应用“保持简单、愚蠢”的原则。
如果有可能尽量避免异步的自包含系统(就像我们的例子),因为它们的复杂度会增长,并且很快变得难以维护。
使用成熟的库,就像gevent(在前一章中所描述的)那样给予你尝试和检验的方法来处理某些问题集。
而且,我们强烈建议使用一个外部的队列系统(例如,Gearman、0MQ、Celery、PyRes或者HotQueue)来给予你对队列状态的外部可视性。这需要更多的思考,但是归因于增加的调试效率和对生产系统的更好的系统可视性,可能会节省你的时间。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论