返回介绍

9.6 用 multiprocessing 来共享 numpy 数据

发布于 2024-01-25 21:44:08 字数 8961 浏览 0 评论 0 收藏 0

当工作于大numpy数组时,你一定会想知道是否能够在进程间为读写存取来共享数据,而不用拷贝数据。这是可能的,尽管有一点烦琐。为了这个演示的灵感,我们要感谢StackOverflow的用户pv。

 警告 

不要使用这个方法来重建BLAS、MKL、Accelerate和ATLAS的行为。这些库都用它们的原语来支持multiprocessing,可能它们要比你所创建的任何新例程更好调试。尽管它们会需要一些配置来开启multiprocessing支持,但是在你投入时间(也失去了调试的时间!)来写你自己的代码之前,看看这些库是否能够给你带来免费的速度提升是明智的。

在进程间共享一个大矩阵有几个收益:

只有一个拷贝意味着没有浪费RAM。

不浪费时间来拷贝大块内存。

你得到了在进程间共享部分结果的可能性。

回想起9.3.3节中使用numpy来估算pi的演示,我们出现问题,那就是随机数生成是一个串行过程。在这里,我们可以想象派生(fork)进程共享了一个大数组,每一个进程使用不同种子的随机数生成器用随机数来填充数组的一个区域,因此生成完一个大随机块可能要比用一个单独的进程要快。

为了验证它,我们修改了接下来的演示代码来以一个串行过程创建一个大随机矩阵(10000乘80000个元素),再把矩阵拆分成4段来并行调用random(在这两种情况下,同一时间处理一行)。串行过程花费15秒,并行版本花费4秒。回去参考9.3.2节来理解并行化随机数生成器的一些危险性。

在本节的余下部分,我们会使用一个简化的版本来演示一点,而剩下的容易验证。

在图9-19中,你能看见Ian的笔记本电脑上的htop输出。它显示了4个子进程的父进程(PID是11268),所有这5个进程共享了一个单独的10000×80000个元素的double型的numpy数组。这个数组的一份拷贝耗费6.4GB,而笔记本电脑只有8GB——你能看见在htop中通过进程计量器显示出Mem读取最多占用了7491MB的RAM。

图9-19 htop显示了RAM和swap使用

为了理解这个演示,我们会首先过一遍控制台的输出,接着我们会看看代码。在  例9-24中,我们启动了父进程:它分配了一个6.4GB的double型的10000×80000维的数组,以零值来填充。10000行会作为索引传递给工作者函数,工作者将依次在每列80000项上操作。已经分配了数组,我们就用生命、宇宙,以及万物的答案(42!)来填充。我们能够在工作者函数中测试我们正接收着这个修改过的数组以及一个非填充0的版本来确认这份代码的表现和预期的一样。

例9-24 设置共享数组

$ python np_shared.py
Created shared array with 6,400,000,000 nbytes
Shared array id is 20255664 in PID 11268
Starting with an array of 0 values:
[[ 0. 0. 0. ..., 0. 0. 0.]

 ...,
 [ 0. 0. 0. ..., 0. 0. 0.]]

Original array filled with value 42:
[[ 42. 42. 42. ..., 42. 42. 42.]
 ...,
 [ 42. 42. 42. ..., 42. 42. 42.]]
Press a key to start workers using multiprocessing...

在例9-25中,我们已经启动了4个进程工作于这个共享数组。不做任何的数组拷贝,每个进程着眼于相同的大内存块,并且每个进程有不同的索引集来工作。工作者每隔几千行输出当前的索引和它的PID,所以我们可以观察它的行为。工作者的任务是琐碎的——它会检查当前元素还是设置在默认值(所以我们知道没有其他进程修改过它),接着它会用当前PID覆盖掉这个值。一旦工作者完成了,我们就回到父进程,再次打印数组。这次,我们看见它填充了PID而不是42。

例9-25 在共享数组上运行worker_fn

worker_fn: with idx 0
 id of shared_array is 20255664 in PID 11288
worker_fn: with idx 2000
 id of shared_array is 20255664 in PID 11291
worker_fn: with idx 1000
 id of shared_array is 20255664 in PID 11289
...
worker_fn: with idx 8000
 id of shared_array is 20255664 in PID 11290

The default value has been over-written with worker_fn's result:
[[ 11288. 11288. 11288. ..., 11288. 11288. 11288.]
 ...,
 [ 11291. 11291. 11291. ..., 11291. 11291. 11291.]]

最后,在例9-26中我们使用了一个计数器来确认在数组中的每个PID的频率。因为工作是均匀划分的,我们期待4 个PID 中的每一个表示相等的次数。在我们的800000000 个元素的数组中,我们看到了 4 组,每组200000000 个 PID。使用PrettyTable来呈现表格输出。

例9-26 在共享数组上验证结果

Verification - extracting unique values from 800,000,000 items
in the numpy array (this might be slow)...
Unique values in shared_array:
+---------+-----------+
| PID    | Count    |
+---------+-----------+
| 11288.0 | 200000000 |
| 11289.0 | 200000000 |
| 11290.0 | 200000000 |
| 11291.0 | 200000000 |
+---------+-----------+
Press a key to exit...

已经完成了,现在程序退出,数组被删除。

我们能够在Linux下用ps和pmap来查看每个进程内部。例9-27显示了调用ps的结果。分割这个命令行:

ps告诉我们有关进程的信息。

-A列出所有的进程。

-o pid、size、vsize、cmd输出PID、大小信息和命令的名字。

grep被用来过滤掉所有其他的结果并且只留下给我们演示的行。

父进程(PID 11268)和它的4个派生(fork)子进程显示在了输出中。结果类似于我们在htop中所见的。我们可以使用pmap来看看每个进程的内存映射,用–x来请求扩展输出。我们grep出模式s-来列出标记为正在共享的内存块。在父进程和子进程中,我们看见一个6250000KB(6.2GB)的块在它们之间共享。

例9-27 使用pmap和ps来查看从操作系统视角中看到的进程

$ ps -A -o pid,size,vsize,cmd | grep np_shared
11268 232464 6564988 python np_shared.py
11288 11232 6343756 python np_shared.py
11289 11228 6343752 python np_shared.py
11290 11228 6343752 python np_shared.py
11291 11228 6343752 python np_shared.py

ian@ian-Latitude-E6420 $ pmap -x 11268 | grep s-
Address       Kbytes  RSS   Dirty Mode  Mapping
00007f1953663000 6250000 6250000 6250000 rw-s-  zero (deleted)
...
ian@ian-Latitude-E6420 $ pmap -x 11288 | grep s-
Address       Kbytes  RSS   Dirty Mode  Mapping
00007f1953663000 6250000 1562512 1562512 rw-s-  zero (deleted)

例9-28显示了为共享这个数组所采取的重要步骤。 我们使用一个multiprocessing. ``Array来分配一块共享内存作为一个1维数组。接着我们从这个对象中实例化了一个numpy数组,并把它重塑回一个2维数组。现在我们有一个numpy包装的内存块,能够在进程间共享,并且能够像一个普通numpy数组那样来寻址。numpy没有管理RAM,multiprocessing.Array在管理它。

例9-28 使用multiprocessing来共享numpy数组

import os
import multiprocessing
from collections import Counter
import ctypes
import numpy as np
from prettytable import PrettyTable

SIZE_A, SIZE_B = 10000, 80000  # 6.2GB - starts to use swap (maximal RAM usage)

在例9-29中,我们可以看见每个派生(fork)进程访问了一个全局的main_nparray。派生(fork)进程有一个numpy对象的拷贝,对象所访问的底层字节作为共享内存来存储。我们的worker_fn将使用当前的进程标识符来覆写一个被选取的行(通过idx)。

例9-29 worker_fn为共享numpy数组使用multiprocessing

def worker_fn(idx):
  """Do some work on the shared np array on row idx"""
  # confirm that no other process has modified this value already
  assert main_nparray[idx, 0] == DEFAULT_VALUE
  # inside the subprocess print the PID and id of the array
  # to check we don't have a copy
  if idx % 1000 == 0:
       print " {}: with idx {}\n id of local_nparray_in_process is {} in PID {}"\
       .format(worker_fn.__name__, idx, id(main_nparray), os.getpid())
# we can do any work on the array; here we set every item in this row to
# have the value of the process ID for this process
main_nparray[idx, :] = os.getpid()

在我们例9-30的__main__中,我们通过三个主要阶段来工作:

1.构建一个共享的multiprocessing.Array并却把它转换成一个numpy数组。

2.给数组设置一个默认值,并生成4个进程来并行地在数组上工作。

3.在进程返回后,验证数组的内容。

典型情况下,你设置了一个numpy数组,并在一个单独的进程中工作,可能就像arr = np.array((100, 5), dtype = np.float_)那样来做些事情。这在一个单独进程中不错,但是你不能跨进程来共享数据,既不能写,也不能读。

技巧就是创建一个共享的字节块。一种方式是创建一个multiprocessing. Array。Array默认包在锁中来防止并发编辑,但是我们不需要这把锁,因为我们会对我们的共享模式小心翼翼。为了清晰地与其他组员进行沟通,把它显式化并设置lock = False是值得的。

如果你不去设置lock = False,那么你会得到一个对象,而不是一个对字节的引用,并且你需要调用.get_obj()来得到字节。通过调用.get_obj(),你绕开了锁,所以在第一步中,不显式地去做,就没有任何值。

接下来我们就采用这个共享字节块,并使用frombuffer来包装成一个numpy数组。dtype是可选的,但是既然我们是在传送字节,显式化总是合理的。我们做了重塑,这样我们就能够以一个2维数组来寻址字节。数组值默认设置成了0。例9-30显示出我们的__main__是满的。

例9-30 为共享而设置numpy数组的主函数

if __name__ == '__main__':
  DEFAULT_VALUE = 42
  NBR_OF_PROCESSES = 4

  # create a block of bytes, reshape into a local numpy array
  NBR_ITEMS_IN_ARRAY = SIZE_A * SIZE_B
  shared_array_base = multiprocessing.Array(ctypes.c_double,
                        NBR_ITEMS_IN_ARRAY, lock=False)
  main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double)
  main_nparray = main_nparray.reshape(SIZE_A, SIZE_B)
  # assert no copy was made
  assert main_nparray.base.base is shared_array_base
  print "Created shared array with {:,} nbytes".format(main_nparray.nbytes)
  print "Shared array id is {} in PID {}".format(id(main_nparray), os.getpid())
  print "Starting with an array of 0 values:"
  print main_nparray
  print

为了证实我们的进程是在我们所启动的相同的数据块上操作,我们会为每一项设置一个新的DEFAULT_VALUE——所以你会看到在例9-31的顶部(我们用生命、宇宙和万物的答案)。下一步,我们构建了一个进程池(在这个例子中是4个进程),接着通过调用map来批量发送行索引。

例9-31 使用multiprocessing来共享numpy数组的主函数

  # modify the data via our local numpy array
  main_nparray.fill(DEFAULT_VALUE)
  print "Original array filled with value {}:".format(DEFAULT_VALUE)
  print main_nparray

  raw_input("Press a key to start workers using multiprocessing...")
  print

  # create a pool of processes that will share the memory block
  # of the global numpy array, and share the reference to the underlying
  # block of data so we can build a numpy array wrapper in the new processes
  pool = multiprocessing.Pool(processes=NBR_OF_PROCESSES)
  # perform a map where each row index is passed as a parameter to the
  # worker_fn
  pool.map(worker_fn, xrange(SIZE_A))

一旦我们完成了并行处理,我们回到父进程来验证结果(例9-32)。验证步骤在数组上通过一个平面化的视图来运行(注意,视图不做拷贝,它只是在2维数组上创建了一个1维的可迭代视图),为每个PID的频率计数。最后,我们执行了一些assert检查来确保我们得到了期望的计数。

例9-32 验证共享结果的主函数

  print "Verification - extracting unique values from {:,} items\nin the numpy
    array (this might be slow)...".format(NBR_ITEMS_IN_ARRAY)
  # main_nparray.flat iterates over the contents of the array, it doesn't
  # make a copy
  counter = Counter(main_nparray.flat)
  print "Unique values in main_nparray:"
  tbl = PrettyTable(["PID", "Count"])
  for pid, count in counter.items():
    tbl.add_row([pid, count])
  print tbl

  total_items_set_in_array = sum(counter.values())

  # check that we have set every item in the array away from DEFAULT_VALUE
  assert DEFAULT_VALUE not in counter.keys()
  # check that we have accounted for every item in the array
  assert total_items_set_in_array == NBR_ITEMS_IN_ARRAY
  # check that we have NBR_OF_PROCESSES of unique keys to confirm that every
  # process did some of the work
  assert len(counter) == NBR_OF_PROCESSES

  raw_input("Press a key to exit...")

我们只创建了一个1维的字节数组,把它转换为一个2维数组,在4个进程间共享数组,并允许它们在相同的内存块上并发处理。这种方法有助你在许多核上搞并行化。然而,要小心对相同数据点的并发存取——如果你想要避免同步的问题,你就不得不在multiprocessing中使用锁,这会拖慢你的代码。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文