- 内容提要
- 前言
- 作者简介
- 封面简介
- 第1章 理解高性能 Python
- 第2章 通过性能分析找到瓶颈
- 2.1 高效地分析性能
- 2.2 Julia 集合的介绍
- 2.3 计算完整的 Julia 集合
- 2.4 计时的简单方法——打印和修饰
- 2.5 用 UNIX 的 time 命令进行简单的计时
- 2.6 使用 cProfile 模块
- 2.7 用 runsnakerun 对 cProfile 的输出进行可视化
- 2.8 用 line_profiler 进行逐行分析
- 2.9 用 memory_profiler 诊断内存的用量
- 2.10 用 heapy 调查堆上的对象
- 2.11 用 dowser 实时画出变量的实例
- 2.12 用 dis 模块检查 CPython 字节码
- 2.13 在优化期间进行单元测试保持代码的正确性
- 2.14 确保性能分析成功的策略
- 2.15 小结
- 第3章 列表和元组
- 第4章 字典和集合
- 第5章 迭代器和生成器
- 第6章 矩阵和矢量计算
- 第7章 编译成 C
- 第8章 并发
- 第9章 multiprocessing 模块
- 第10章 集群和工作队列
- 第11章 使用更少的 RAM
- 第12章 现场教训
9.6 用 multiprocessing 来共享 numpy 数据
当工作于大numpy数组时,你一定会想知道是否能够在进程间为读写存取来共享数据,而不用拷贝数据。这是可能的,尽管有一点烦琐。为了这个演示的灵感,我们要感谢StackOverflow的用户pv。
警告
不要使用这个方法来重建BLAS、MKL、Accelerate和ATLAS的行为。这些库都用它们的原语来支持multiprocessing,可能它们要比你所创建的任何新例程更好调试。尽管它们会需要一些配置来开启multiprocessing支持,但是在你投入时间(也失去了调试的时间!)来写你自己的代码之前,看看这些库是否能够给你带来免费的速度提升是明智的。
在进程间共享一个大矩阵有几个收益:
只有一个拷贝意味着没有浪费RAM。
不浪费时间来拷贝大块内存。
你得到了在进程间共享部分结果的可能性。
回想起9.3.3节中使用numpy来估算pi的演示,我们出现问题,那就是随机数生成是一个串行过程。在这里,我们可以想象派生(fork)进程共享了一个大数组,每一个进程使用不同种子的随机数生成器用随机数来填充数组的一个区域,因此生成完一个大随机块可能要比用一个单独的进程要快。
为了验证它,我们修改了接下来的演示代码来以一个串行过程创建一个大随机矩阵(10000乘80000个元素),再把矩阵拆分成4段来并行调用random(在这两种情况下,同一时间处理一行)。串行过程花费15秒,并行版本花费4秒。回去参考9.3.2节来理解并行化随机数生成器的一些危险性。
在本节的余下部分,我们会使用一个简化的版本来演示一点,而剩下的容易验证。
在图9-19中,你能看见Ian的笔记本电脑上的htop输出。它显示了4个子进程的父进程(PID是11268),所有这5个进程共享了一个单独的10000×80000个元素的double型的numpy数组。这个数组的一份拷贝耗费6.4GB,而笔记本电脑只有8GB——你能看见在htop中通过进程计量器显示出Mem读取最多占用了7491MB的RAM。
图9-19 htop显示了RAM和swap使用
为了理解这个演示,我们会首先过一遍控制台的输出,接着我们会看看代码。在 例9-24中,我们启动了父进程:它分配了一个6.4GB的double型的10000×80000维的数组,以零值来填充。10000行会作为索引传递给工作者函数,工作者将依次在每列80000项上操作。已经分配了数组,我们就用生命、宇宙,以及万物的答案(42!)来填充。我们能够在工作者函数中测试我们正接收着这个修改过的数组以及一个非填充0的版本来确认这份代码的表现和预期的一样。
例9-24 设置共享数组
$ python np_shared.py Created shared array with 6,400,000,000 nbytes Shared array id is 20255664 in PID 11268 Starting with an array of 0 values: [[ 0. 0. 0. ..., 0. 0. 0.] ..., [ 0. 0. 0. ..., 0. 0. 0.]] Original array filled with value 42: [[ 42. 42. 42. ..., 42. 42. 42.] ..., [ 42. 42. 42. ..., 42. 42. 42.]] Press a key to start workers using multiprocessing...
在例9-25中,我们已经启动了4个进程工作于这个共享数组。不做任何的数组拷贝,每个进程着眼于相同的大内存块,并且每个进程有不同的索引集来工作。工作者每隔几千行输出当前的索引和它的PID,所以我们可以观察它的行为。工作者的任务是琐碎的——它会检查当前元素还是设置在默认值(所以我们知道没有其他进程修改过它),接着它会用当前PID覆盖掉这个值。一旦工作者完成了,我们就回到父进程,再次打印数组。这次,我们看见它填充了PID而不是42。
例9-25 在共享数组上运行worker_fn
worker_fn: with idx 0 id of shared_array is 20255664 in PID 11288 worker_fn: with idx 2000 id of shared_array is 20255664 in PID 11291 worker_fn: with idx 1000 id of shared_array is 20255664 in PID 11289 ... worker_fn: with idx 8000 id of shared_array is 20255664 in PID 11290 The default value has been over-written with worker_fn's result: [[ 11288. 11288. 11288. ..., 11288. 11288. 11288.] ..., [ 11291. 11291. 11291. ..., 11291. 11291. 11291.]]
最后,在例9-26中我们使用了一个计数器来确认在数组中的每个PID的频率。因为工作是均匀划分的,我们期待4 个PID 中的每一个表示相等的次数。在我们的800000000 个元素的数组中,我们看到了 4 组,每组200000000 个 PID。使用PrettyTable来呈现表格输出。
例9-26 在共享数组上验证结果
Verification - extracting unique values from 800,000,000 items in the numpy array (this might be slow)... Unique values in shared_array: +---------+-----------+ | PID | Count | +---------+-----------+ | 11288.0 | 200000000 | | 11289.0 | 200000000 | | 11290.0 | 200000000 | | 11291.0 | 200000000 | +---------+-----------+ Press a key to exit...
已经完成了,现在程序退出,数组被删除。
我们能够在Linux下用ps和pmap来查看每个进程内部。例9-27显示了调用ps的结果。分割这个命令行:
ps告诉我们有关进程的信息。
-A列出所有的进程。
-o pid、size、vsize、cmd输出PID、大小信息和命令的名字。
grep被用来过滤掉所有其他的结果并且只留下给我们演示的行。
父进程(PID 11268)和它的4个派生(fork)子进程显示在了输出中。结果类似于我们在htop中所见的。我们可以使用pmap来看看每个进程的内存映射,用–x来请求扩展输出。我们grep出模式s-来列出标记为正在共享的内存块。在父进程和子进程中,我们看见一个6250000KB(6.2GB)的块在它们之间共享。
例9-27 使用pmap和ps来查看从操作系统视角中看到的进程
$ ps -A -o pid,size,vsize,cmd | grep np_shared 11268 232464 6564988 python np_shared.py 11288 11232 6343756 python np_shared.py 11289 11228 6343752 python np_shared.py 11290 11228 6343752 python np_shared.py 11291 11228 6343752 python np_shared.py ian@ian-Latitude-E6420 $ pmap -x 11268 | grep s- Address Kbytes RSS Dirty Mode Mapping 00007f1953663000 6250000 6250000 6250000 rw-s- zero (deleted) ... ian@ian-Latitude-E6420 $ pmap -x 11288 | grep s- Address Kbytes RSS Dirty Mode Mapping 00007f1953663000 6250000 1562512 1562512 rw-s- zero (deleted)
例9-28显示了为共享这个数组所采取的重要步骤。 我们使用一个multiprocessing. ``Array来分配一块共享内存作为一个1维数组。接着我们从这个对象中实例化了一个numpy数组,并把它重塑回一个2维数组。现在我们有一个numpy包装的内存块,能够在进程间共享,并且能够像一个普通numpy数组那样来寻址。numpy没有管理RAM,multiprocessing.Array在管理它。
例9-28 使用multiprocessing来共享numpy数组
import os import multiprocessing from collections import Counter import ctypes import numpy as np from prettytable import PrettyTable SIZE_A, SIZE_B = 10000, 80000 # 6.2GB - starts to use swap (maximal RAM usage)
在例9-29中,我们可以看见每个派生(fork)进程访问了一个全局的main_nparray。派生(fork)进程有一个numpy对象的拷贝,对象所访问的底层字节作为共享内存来存储。我们的worker_fn将使用当前的进程标识符来覆写一个被选取的行(通过idx)。
例9-29 worker_fn为共享numpy数组使用multiprocessing
def worker_fn(idx): """Do some work on the shared np array on row idx""" # confirm that no other process has modified this value already assert main_nparray[idx, 0] == DEFAULT_VALUE # inside the subprocess print the PID and id of the array # to check we don't have a copy if idx % 1000 == 0: print " {}: with idx {}\n id of local_nparray_in_process is {} in PID {}"\ .format(worker_fn.__name__, idx, id(main_nparray), os.getpid()) # we can do any work on the array; here we set every item in this row to # have the value of the process ID for this process main_nparray[idx, :] = os.getpid()
在我们例9-30的__main__中,我们通过三个主要阶段来工作:
1.构建一个共享的multiprocessing.Array并却把它转换成一个numpy数组。
2.给数组设置一个默认值,并生成4个进程来并行地在数组上工作。
3.在进程返回后,验证数组的内容。
典型情况下,你设置了一个numpy数组,并在一个单独的进程中工作,可能就像arr = np.array((100, 5), dtype = np.float_)那样来做些事情。这在一个单独进程中不错,但是你不能跨进程来共享数据,既不能写,也不能读。
技巧就是创建一个共享的字节块。一种方式是创建一个multiprocessing. Array。Array默认包在锁中来防止并发编辑,但是我们不需要这把锁,因为我们会对我们的共享模式小心翼翼。为了清晰地与其他组员进行沟通,把它显式化并设置lock = False是值得的。
如果你不去设置lock = False,那么你会得到一个对象,而不是一个对字节的引用,并且你需要调用.get_obj()来得到字节。通过调用.get_obj(),你绕开了锁,所以在第一步中,不显式地去做,就没有任何值。
接下来我们就采用这个共享字节块,并使用frombuffer来包装成一个numpy数组。dtype是可选的,但是既然我们是在传送字节,显式化总是合理的。我们做了重塑,这样我们就能够以一个2维数组来寻址字节。数组值默认设置成了0。例9-30显示出我们的__main__是满的。
例9-30 为共享而设置numpy数组的主函数
if __name__ == '__main__': DEFAULT_VALUE = 42 NBR_OF_PROCESSES = 4 # create a block of bytes, reshape into a local numpy array NBR_ITEMS_IN_ARRAY = SIZE_A * SIZE_B shared_array_base = multiprocessing.Array(ctypes.c_double, NBR_ITEMS_IN_ARRAY, lock=False) main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double) main_nparray = main_nparray.reshape(SIZE_A, SIZE_B) # assert no copy was made assert main_nparray.base.base is shared_array_base print "Created shared array with {:,} nbytes".format(main_nparray.nbytes) print "Shared array id is {} in PID {}".format(id(main_nparray), os.getpid()) print "Starting with an array of 0 values:" print main_nparray print
为了证实我们的进程是在我们所启动的相同的数据块上操作,我们会为每一项设置一个新的DEFAULT_VALUE——所以你会看到在例9-31的顶部(我们用生命、宇宙和万物的答案)。下一步,我们构建了一个进程池(在这个例子中是4个进程),接着通过调用map来批量发送行索引。
例9-31 使用multiprocessing来共享numpy数组的主函数
# modify the data via our local numpy array main_nparray.fill(DEFAULT_VALUE) print "Original array filled with value {}:".format(DEFAULT_VALUE) print main_nparray raw_input("Press a key to start workers using multiprocessing...") print # create a pool of processes that will share the memory block # of the global numpy array, and share the reference to the underlying # block of data so we can build a numpy array wrapper in the new processes pool = multiprocessing.Pool(processes=NBR_OF_PROCESSES) # perform a map where each row index is passed as a parameter to the # worker_fn pool.map(worker_fn, xrange(SIZE_A))
一旦我们完成了并行处理,我们回到父进程来验证结果(例9-32)。验证步骤在数组上通过一个平面化的视图来运行(注意,视图不做拷贝,它只是在2维数组上创建了一个1维的可迭代视图),为每个PID的频率计数。最后,我们执行了一些assert检查来确保我们得到了期望的计数。
例9-32 验证共享结果的主函数
print "Verification - extracting unique values from {:,} items\nin the numpy array (this might be slow)...".format(NBR_ITEMS_IN_ARRAY) # main_nparray.flat iterates over the contents of the array, it doesn't # make a copy counter = Counter(main_nparray.flat) print "Unique values in main_nparray:" tbl = PrettyTable(["PID", "Count"]) for pid, count in counter.items(): tbl.add_row([pid, count]) print tbl total_items_set_in_array = sum(counter.values()) # check that we have set every item in the array away from DEFAULT_VALUE assert DEFAULT_VALUE not in counter.keys() # check that we have accounted for every item in the array assert total_items_set_in_array == NBR_ITEMS_IN_ARRAY # check that we have NBR_OF_PROCESSES of unique keys to confirm that every # process did some of the work assert len(counter) == NBR_OF_PROCESSES raw_input("Press a key to exit...")
我们只创建了一个1维的字节数组,把它转换为一个2维数组,在4个进程间共享数组,并允许它们在相同的内存块上并发处理。这种方法有助你在许多核上搞并行化。然而,要小心对相同数据点的并发存取——如果你想要避免同步的问题,你就不得不在multiprocessing中使用锁,这会拖慢你的代码。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论