在AWS EC2计算机中使用多处理时,可以改进
我的算法是平均计算超过1,000,000个单独的进程,因此使用pool
Python多处理
library(一旦我尝试了MAP,并且一次使用imap
)
当我拥有28台计算机时,当我搬到AWS EC2时,它可以使用约350次迭代
,这是192 CPU内核,它仅改进了2倍! 〜700迭代/分钟。
我想知道我是否缺少任何好练习...?
我尝试了:
- 监视CPU使用 - 所有这些都似乎都使用了
- 内存监视器 - 仅使用15GB的EC2具有的750GB RAM
- 使用
MAP
而不是改进IMAP 没有改进 - ,似乎最佳点是给出
pool
53核的参数而不是192 (给出〜950 iTer/min的时间)
代码看起来像:
import os
NUM_THREADS = "1"
os.environ["OMP_NUM_THREADS"] = NUM_THREADS
os.environ["OPENBLAS_NUM_THREADS"] = NUM_THREADS
os.environ["MKL_NUM_THREADS"] = NUM_THREADS
os.environ["VECLIB_MAXIMUM_THREADS"] = NUM_THREADS
os.environ["NUMEXPR_NUM_THREADS"] = NUM_THREADS
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool
import time
def sample_in_pool(n_r_c_touple):
# calculations using matrices, multiplications, numpy - extensive use of cpu
return (xsyndrome, zsyndrome,
np.array(exact_choi_matrix),
np.array(correction_exact),
np.array(diamond_norms))
if __name__ == "__main__":
def run_test(rows,cols,nsamples, ncores):
starttime = time.time()
with Pool(processes=mp.cpu_count()) as pool:
result = pool.map(sample_in_pool, [(noise_pepso, rows, cols)] * nsamples)
xsyndrome = [x[0] for x in result]
zsyndrome = [x[1] for x in result]
exact_choi_matrix = [x[2] for x in result]
correction_exact = [x[3] for x in result]
diamond_norms = [x[4] for x in result]
pool.terminate()
pool.close()
samples_so_far = nsamples
"\nSamples/min (min): " + str(round(samples_so_far / ((time.time() - starttime) / 60)))
def run_batch_of_d_N_composite(d, composite, N,nsamples):
for i in range(N):
diamond_mean, plot_str = run_test(rows=d, cols=d,
nsamples=nsamples, ncores=ncores,
noise_type="Composite",
rot_angle=0.1,
composite=composite, big_omega_error=False,
test_batch=test_batch_folder, computer_name=computer)
run_batch_of_d_N_composite(d=7, composite=False, N=1,nsamples=1500)
My algorithm is calculating an average over 1,000,000 separate processes, therefore uses Pool
of python multiprocessing
library (once I tried with map and once with imap
)
When I a computer with 28, it did it with ~350 iterations/minute
When I moved to AWS EC2, which is with 192 CPU cores, it improved only by a factor of 2! ~700 iterations/minute.
I wonder if there is any good practice I am missing...?
I tried:
- monitor CPU usage - all of them seem to be used
- Memory monitor - using only 15GB of 750GB RAM that the EC2 has
- using
map
insteadimap
did not improve - seems that the sweet spot is giving
Pool
parameter of 53 cores instead 192 (gives time of ~950 iter/min)
The code looks like:
import os
NUM_THREADS = "1"
os.environ["OMP_NUM_THREADS"] = NUM_THREADS
os.environ["OPENBLAS_NUM_THREADS"] = NUM_THREADS
os.environ["MKL_NUM_THREADS"] = NUM_THREADS
os.environ["VECLIB_MAXIMUM_THREADS"] = NUM_THREADS
os.environ["NUMEXPR_NUM_THREADS"] = NUM_THREADS
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool
import time
def sample_in_pool(n_r_c_touple):
# calculations using matrices, multiplications, numpy - extensive use of cpu
return (xsyndrome, zsyndrome,
np.array(exact_choi_matrix),
np.array(correction_exact),
np.array(diamond_norms))
if __name__ == "__main__":
def run_test(rows,cols,nsamples, ncores):
starttime = time.time()
with Pool(processes=mp.cpu_count()) as pool:
result = pool.map(sample_in_pool, [(noise_pepso, rows, cols)] * nsamples)
xsyndrome = [x[0] for x in result]
zsyndrome = [x[1] for x in result]
exact_choi_matrix = [x[2] for x in result]
correction_exact = [x[3] for x in result]
diamond_norms = [x[4] for x in result]
pool.terminate()
pool.close()
samples_so_far = nsamples
"\nSamples/min (min): " + str(round(samples_so_far / ((time.time() - starttime) / 60)))
def run_batch_of_d_N_composite(d, composite, N,nsamples):
for i in range(N):
diamond_mean, plot_str = run_test(rows=d, cols=d,
nsamples=nsamples, ncores=ncores,
noise_type="Composite",
rot_angle=0.1,
composite=composite, big_omega_error=False,
test_batch=test_batch_folder, computer_name=computer)
run_batch_of_d_N_composite(d=7, composite=False, N=1,nsamples=1500)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
更多
发布评论
评论(2)
首先,已知多处理会导致可伸缩性问题,由于昂贵的过程间通信(IPC)。减少开销的唯一方法是减少可能通过在共享内存上操作的过程之间传输的数据量(集中式慢操作)。从理论上讲,多线程可以减轻此问题,但是主流Cpython解释器的全局解释器锁(GIL)可以阻止任何加速,除非发布。许多Numpy操作就是这种情况,但是由于理论上的局限性,它仍然无法完美地扩展(请参见稍后)。如果您的计算仅进行Numpy操作,我强烈建议您不要使用多处理,而是使用 Numba/Cython/Cython 的叉-Join操作(他们使用多线程阅读,但没有GIL问题,以免您支付费用创建过程或IPC)。
此外,没有计算可以完美地扩展。在192个核心上,具有稍微顺序的连续工作的略有序列作品的尺寸不佳。这称为 amdahl's_law 。一些资源喜欢RAM在核心之间共享,并且可以饱和(请参见例如,例如最近的帖子)。
此外,计算机不是一组独立的核心,它们更复杂。实际上,带有许多核心的现代计算机是 numa Systems 。在这样的机器上,将核心分组为NUMA节点,这些节点可以有效地访问其自身的内存,但对其他一个节点却不太有效。 Numa节点有自己的内存,如果将所有内存分配在同一节点上,则很容易饱和。因此,您需要在几个节点上平衡数据,以避免此问题。这是通过控制 NUMA分配策略来完成的。默认策略通常是在存储页面上的第一触摸期间执行本地分配,因此您需要执行本地访问。假设过程不会在核心之间移动,多处理对于分配策略的行为往往具有良好的行为。因此,您需要将过程与核心结合,以改善数据位置。此外,更多的物理内核包含多个硬件线程共享相同的资源(因此,如果他们进行优化的数值强度操作,则在同一核心上运行2个进程通常不会更快)。这就是称为 smt ,对于加快延迟限制是有用的(例如,io-boung。IO-Boung。代码,数据换位,不优化的标量代码)。这是一个非常复杂的话题。有关更多信息,请阅读此页面 以及可能的高表现计算书籍。
实际上,EC2机器具有2个插座的AMD EPYC 7R13,每个核心每个核心有48个核心,每个核心2个线。这是使用SMT复杂的NUMA架构的一个很好的例子。因此,在实践中,您应该期望只有小于96 的速度(假设IPC对Numpy代码不太可能有所帮助)。因为numpy执行了很多临时数组副本,因此代码肯定可以是内存绑定 ,并且饱和ram (请参阅这篇文章或例如,这是一个)。同样,Numba/Cython可以帮助减少创建的临时数组的数量,并使整个计算减少内存限制(因此最终扩展更好)。
First of all, multiprocessing is known to cause scalability issues due to the expensive inter-process communication (IPC). The only way to reduce this overhead is to reduce the amount of data transferred between processes (centralized slow operation) possibly by operating on shared memory. Multithreading can theoretically mitigate this problem but the Global Interpreter Lock (GIL) of the mainstream CPython interpreter prevents any speed-up unless it is released. This is the case for many Numpy operation but it will still not scale perfectly due to theoretical limitations (see later). If your computation only make Numpy operations, I strongly advise you not to use multiprocessing but the fork-join operations of Numba/Cython (they use multithreading but without the GIL issue so you do not pay the cost of creating process nor IPC).
Moreover, no computation can perfectly scale. The ones with a slight sequential portion of sequential work does not scale well on 192 cores. This is called the Amdahl's_law. Some resources likes the RAM is shared between cores and can be saturated (see this recent post for example).
Additionally, computing machine are not a set of independent cores, they are far more complex. Indeed, modern computing machines with a lot of cores are NUMA systems. On such machines, cores are grouped into NUMA nodes that can access efficiently to their own memory but less efficiently to the one of others. NUMA nodes haves their own memory that can be easily saturated if all the memory is allocated on the same node. Thus, you need to balance data on several nodes so to avoid this problem. This is done by controlling the NUMA allocation policy. The default policy is generally to perform a local allocation during the first-touch on a memory page so you need to perform local accesses. Multiprocessing tends to have a good behaviour regarding the allocation policy assuming processes do not moves between cores. Thus, you need to bind processes to cores so to improve data locality. Further, more physical cores contains multiple hardware threads sharing the same resources (so it is generally not much faster to run 2 processes on the same core if they are doing optimized numerically-intensive operations). This is called SMT and it is useful to speed up latency bound computations (eg. IO-bound codes, data transpositions, unoptimized scalar codes). This is a very complex topic. For more information please read this page and possibly high-performance computing books.
In fact, EC2 machines have 2 sockets of AMD EPYC 7R13 with 48 core each and 2 threads per cores. This is a good example of complex NUMA architecture with SMT. Thus, in practice, you should expect only a speed up smaller than 96 (assuming the IPC does not help much which is unlikely for Numpy codes). Because Numpy perform a lot of temporary array copies, the code can certainly be memory bound and saturate the RAM (see this post or this one for example). Again, Numba/Cython can help to reduce the number of temporary array created and make the whole computation less memory-bound (and thus eventually scale better).
您的代码可能无法执行不像您想要的那样,可能有很多可能的原因,但是在此答案中,我将重点关注一个这样的领域:数据序列化多处理。
每当您创建一个池时,所有参数都需要序列化,以便可以与另一个过程(也称为腌制)一起使用。多处理使用泡菜这样做。这种序列化发生在池池创建的每个任务而不是按过程中。例如,请考虑以下代码,在其中我们将内置
str
类子类替代Dunder方法__ GetState __ getState __
和__ setState __
。这些方法用于告诉Python(和Pickle)如何序列化/对象序列化。我们留下一个有用的打印
语句,以了解创建池时发生的事情:输出
如您所见,总共有5个腌制,还有5个未打电话的呼叫,即使池的开始数量为2。同样,这是因为池类类每任务挑逗这些参数,而不是每个工人。
显然,这增加了超越头的额外费用,这会随着需要腌制的物体的复杂性而缩放(请记住,工人的结果也被腌制)。因此,如果您通过“大型矩阵,数百万个数字的大小”,并且创建一百万个任务,那么这些额外的腌制/未拨打电话也将花费更长的时间(更复杂的对象),并影响您的性能。
那么解决方案是什么?
理想情况下,我们希望将腌制电话的数量限制为我们创建的工人数量最多(在下面是不可能的)。为此,我们使用 supperrocessing.process.process.process 池(请参阅此答案详细证明了每个工人使用过程中仅一次腌制一次)。这里的方法是,我们没有创建一些工人并使用池传递许多任务,而是使用流程创建了一些中间人,并传递了他们可以使用原始工人执行的任务列表他们自己的过程,无需腌制/未挑剔。在下面的代码中,这要清楚得多。但是在此之前,回到您的用例中,由于任务更具CPU绑定,因此假设我们可以创建的最佳进程数为8(这对您的计算机来说是不同的)。因此,如果我们可以将序列化调用的数量限制为8,那么我们可以期望有多少差异?让我们使用以下代码检查:
在此代码中,我们首先使用8个进程的池来完成80,000个任务。实际任务由
main_worker
执行,该任务只是返回1000个数字的任意列表。任意列表my_list
也将main_worker
作为参数传递。与此列表有关的唯一相关的是,它是80,000个数字的列表,这是我尝试使数据更大的数据。池完成执行任务后,它现在使用相同的参数列表args
用于池(80,000个数字的80,000个列表列表),以启动8indirect_worker s
多处理。程序
。每个indirect_worker
Args
的部分收回,它用来启动main_worker
。因此,通过使用“过程”设置,我们只创建8位工人,因此只有8次腌制/取消腌制。这是我们测试的结果:
现在请记住,我们有8个过程同时,因此有8个打印件。假设流程花费了11.0s(应该添加一点时间,因为启动该过程本身并没有考虑到启动和结束),这使我们的速度持续33%。 请记住,如果您不使用共享内存,则实际改进将是较小的,因为在
indirect_worker
中,我们不是在腌制结果到主要过程。话虽如此,这种改进是用相当简单的对象作为需要腌制/未挑选的参数进行的,我认为它会随着参数的复杂性的增长而扩展。实际上,如果对于您的结构来说是可行的,则实际上可以使用 /a>在indirect_worker
中使用numpy来存储返回值并获得比POL内置的返回值序列化更好的速度好处,因为您甚至根本不需要泡菜/取消挑选它们!我在
run_test
中注意到了这一行注意 : 内存中相同对象的
There could be many possible reasons why your code might not be performing as well as you'd want it to, but in this answer I will focus on one such area: data serialization in multiprocessing.
Whenever you create a pool, all the arguments need to be serialized so that they can be used with another process (also called pickling). Multiprocessing uses pickle to do so. This serialization happens for every task that the pool creates, rather than per process. For example, consider the code below, where we subclass the builtin
str
class and override the dunder methods__getstate__
and__setstate__
. These methods are used to tell python (and pickle) how to serialize/deserialize your object. We leave a helpfulprint
statement to know what is happening when we create our pool:Output
As you can see, there were a total of 5 pickling, and 5 unpickling calls made, even though the number of processes the pool was started with was 2. Again, this is because the Pool class pickles these arguments per task, rather than per worker.
This obviously adds extra-overhead, which scales with the complexity of the objects that need to pickled (keep in mind the result from the worker is pickled as well). Therefore, if you are passing "big matrices, with a size in million of numbers", and creating a million tasks, these extra pickling/unpickling calls will take longer as well (more complex objects), and impact your performance.
So what's the solution?
Ideally, we want to restrict the number of pickling calls to at most the number of worker we create (below that would be impossible). To do this, we use multiprocessing.Process instead of Pool (see this answer for detailed demonstration of only pickling once per worker using process). The gist of the approach here is that instead of creating a few workers and passing them a lot of tasks using a pool, we create a few intermediatory workers using Process, and pass them the list of tasks which they can execute using the original workers in their own process, without needing to pickle/unpickle. This is made much clearer in the code below. But before that, coming back to your use-case, since the tasks are more cpu-bound, lets assume the optimal number of processes we can create is 8 (this will be different for your machine). Therefore, if we can limit the number of serialization calls to 8, then exactly how much of a difference can we expect? Let's check using the code below:
In this code, we first use a Pool of 8 processes to complete 80,000 tasks. The actual task is executed by
main_worker
, which just returns an arbitrary list of a 1000 numbers. An arbitrary listmy_list
is also passed to themain_worker
as the argument. The only thing relevant about this list is that it's a fairly large list of 80,000 numbers, which was my attempt to make the data needed to be pickled larger. After the pool finishes executing it's tasks, it now uses the same argument listargs
used for the pool (a list of 80,000 lists of 80,000 numbers) to start 8indirect_worker
s usingmultiprocessing.Process
. Eachindirect_worker
recieves of portion ofargs
which it uses to start themain_worker
. Therefore, by using the "process" setup we are only creating 8 workers, and hence only pickling/unpickling 8 times.Here is the result of our test:
Now keep in mind that we had 8 processes running concurrently, therefore there are 8 prints for them. Assuming the processes took 11.0s (a little time should be added since starting the process itself wasn't accounted for) to start and finish, that leaves us with a straight 33% improvement in speed. Keep in mind however, that the actual improvement will be lesser if you're not using shared memory, since in the
indirect_worker
we were not pickling the results to send to the main process. With that said, this improvement was done with fairly simple objects as arguments which needed to be pickled/unpickled, and I assume it will scale as the complexity of the arguments grow. In fact, if it's feasible for your structure, you can actually use multiprocessing.shared_memory with numpy inindirect_worker
to store the return values and get even better speed benefits over pool's built in serialization of return values since you won't even have to pickle/unpickle them at all!Note: I noticed this line in your code in
run_test
:Even though your RAM usage isn't very high, it's probably better to use generators as the iterable rather than storing copies of the same object in memory