为什么使用 joblib.Parallel() 时我的代码运行速度比不使用时慢得多?
我刚开始使用 joblib.Parallel() 来加速一些大规模的 numpy.fft 计算。
我遵循这个 示例 在 joblib
-web 上展示
使用该示例,我可以在我的计算机上看到以下结果:
Elapsed time computing the average of couple of slices 1.69 s
Elapsed time computing the average of couple of slices 2.64 s
Elapsed time computing the average of couple of slices 0.40 s
Elapsed time computing the average of couple of slices 0.26 s
它们看起来非常好!然后我将 data[s1].mean()
更改为 np.fft.fft( data[s1] )
,请参阅以下代码:
import numpy as np
data = np.random.random((int(2**24),))
window_size = int(256)
slices = [slice(start, start + window_size)
for start in range(0, data.size - window_size, window_size)]
len(slices)
import time
def slow_FFT(data, sl):
return np.fft.fft(data[sl])
tic = time.time()
results = [slow_FFT(data, sl) for sl in slices]
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
.format(toc - tic))
np.shape(results)
from joblib import Parallel, delayed
tic = time.time()
results2 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
.format(toc - tic))
import os
from joblib import dump, load, Parallel
folder = './joblib5_memmap'
try:
os.mkdir(folder)
except FileExistsError:
pass
data_filename_memmap = os.path.join(folder, 'data_memmap')
dump(data, data_filename_memmap)
data = load(data_filename_memmap, mmap_mode='r')
tic = time.time()
results3 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
.format(toc - tic))
def slow_FFT_write_output(data, sl, output, idx):
res_ = np.fft.fft(data[sl])
output[idx,:] = res_
output_filename_memmap = os.path.join(folder, 'output_memmap')
output = np.memmap(output_filename_memmap, dtype=np.cdouble,shape=
(len(slices),window_size), mode='w+')
data = load(data_filename_memmap, mmap_mode='r')
tic = time.time()
_ = Parallel(n_jobs=4)(delayed(slow_FFT_write_output)
(data, sl, output, idx) for idx, sl in enumerate(slices))
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
.format(toc - tic))
print(np.allclose(np.array(results),output))
我没有看到 4 核的加速在“共享内存的可写memmap”中
,首先,我们将评估我们问题上的顺序计算:
Elapsed time computing the average of couple of slices 0.62 s
joblib.Parallel()
用于使用4个worker并行计算所有切片的平均值:
Elapsed time computing the average of couple of slices 4.29 s
并行处理是已经比顺序处理快了。还可以通过将数据数组转储到内存映射并将内存映射传递给 joblib.Parallel() 来消除一些开销:
Elapsed time computing the average of couple of slices 1.94 s
共享内存的可写内存映射:
Elapsed time computing the average of couple of slices 1.46 s
True
有人可以帮我“为什么”吗?非常感谢!
I am new to use joblib.Parallel()
to speed up some massive numpy.fft
calculations.
I follow this example presented on joblib
-web
Using the example, I can see following result on my computer:
Elapsed time computing the average of couple of slices 1.69 s
Elapsed time computing the average of couple of slices 2.64 s
Elapsed time computing the average of couple of slices 0.40 s
Elapsed time computing the average of couple of slices 0.26 s
They look very good! Then I change data[s1].mean()
to np.fft.fft( data[s1] )
, see following code:
import numpy as np
data = np.random.random((int(2**24),))
window_size = int(256)
slices = [slice(start, start + window_size)
for start in range(0, data.size - window_size, window_size)]
len(slices)
import time
def slow_FFT(data, sl):
return np.fft.fft(data[sl])
tic = time.time()
results = [slow_FFT(data, sl) for sl in slices]
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
.format(toc - tic))
np.shape(results)
from joblib import Parallel, delayed
tic = time.time()
results2 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
.format(toc - tic))
import os
from joblib import dump, load, Parallel
folder = './joblib5_memmap'
try:
os.mkdir(folder)
except FileExistsError:
pass
data_filename_memmap = os.path.join(folder, 'data_memmap')
dump(data, data_filename_memmap)
data = load(data_filename_memmap, mmap_mode='r')
tic = time.time()
results3 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
.format(toc - tic))
def slow_FFT_write_output(data, sl, output, idx):
res_ = np.fft.fft(data[sl])
output[idx,:] = res_
output_filename_memmap = os.path.join(folder, 'output_memmap')
output = np.memmap(output_filename_memmap, dtype=np.cdouble,shape=
(len(slices),window_size), mode='w+')
data = load(data_filename_memmap, mmap_mode='r')
tic = time.time()
_ = Parallel(n_jobs=4)(delayed(slow_FFT_write_output)
(data, sl, output, idx) for idx, sl in enumerate(slices))
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
.format(toc - tic))
print(np.allclose(np.array(results),output))
I do not see speedup with 4 cores in "Writable memmap for shared memory"
First, we will evaluate the sequential computing on our problem:
Elapsed time computing the average of couple of slices 0.62 s
joblib.Parallel()
is used to compute in parallel the average of all slices using 4 workers:
Elapsed time computing the average of couple of slices 4.29 s
Parallel processing is already faster than the sequential processing. It is also possible to remove a bit of overhead by dumping the data array to a memmap and pass the memmap to joblib.Parallel()
:
Elapsed time computing the average of couple of slices 1.94 s
Writable memmap for shared memory:
Elapsed time computing the average of couple of slices 1.46 s
True
Can someone help me "why"? Many thanks in advance!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(17)
答:
当然,您的代码已经获得了巨大的附加开销成本,并且它不断重复收集
65536 x
(很多次!!):SER / xfer / DES附加成本(
[SPACE]
-wise 作为 RAM 分配 +[TIME]
-wise CPU + RAM-I/O 延迟)一次又一次地序列化 + 传输 p2p + 反序列化一块
1.1 [GB]
的相同数据 RAM 到 RAMA :
Sure, your code has acquired IMMENSE add-on overhead-costs, and it keeps repeating collecting
65536 x
( that many times!!) the :SER / xfer / DES add-on costs (
[SPACE]
-wise as RAM allocations +[TIME]
-wise CPU + RAM-I/O delays )to again and again serialise + transfer p2p + deserialise a block of
1.1 [GB]
of the same data RAM to RAM| # CRITICAL SECTION
results3 = Parallel( n_jobs = 4 # 1.spawn 4 process replicas
)( delayed( slow_FFT # + keep
)( data, # feeding them with
sl ) # <_1.1_GB_data_> + <_sl_>-Objects
for sl # for each slice
in slices # from slices
) # again and again 65k+ times
#
| # CRITICAL SECTION
results3 = Parallel( n_jobs = 4 # 1.spawn 4 process replicas
)( delayed( slow_FFT # + keep
)( data, # feeding them with
sl ) # <_1.1_GB_data_> + <_sl_>-Objects
for sl # for each slice
in slices # from slices
) # again and again 65k+ times
#
| # CRITICAL SECTION +72 [TB] DATA-FLOW RAM-I/O PAIN
pass; toc = time.time()
这种使用迭代器语法糖的“低成本”SLOC 会因为做大量非生产性工作而受到惩罚,因此没有做唯一有用的工作。
重构策略,仅支付一次 SER/xfer/DES 附加成本(在
n_jobs
进程实例化期间,无论如何都会完成)并且从不传递
数据< /code>,在所有复制的 Python 解释器进程中已经“已知”。最好制定临时迭代器,以便在大块上自主地在“远程”工作人员内部工作,通过智能调用签名定义,仅被调用一次
(而不是
65536 x
)接下来就是:
| # CRITICAL SECTION +72 [TB] DATA-FLOW RAM-I/O PAIN
pass; toc = time.time()
This "low-cost" SLOC with using an iterator syntax sugar is punished by doing awfully lot unproductive work, thus not doing the only useful one.
Refactor the strategy, to pay SER/xfer/DES add-on costs just once (during the instantiation of the
n_jobs
-processes, which is done anyway)and never pass
data
, that are already "known" inside all then_jobs
copied Python Interpreter processes. Best formulate ad-hoc iterators to work inside the "remote" workers autonomously on large blocks, defined via the smart call-signature, being called just once( and not as many as
65536 x
)and next just :
| # CRITICAL SECTION
results3 = Parallel( n_jobs = 4 # 1.spawn 4 process replicas
)( delayed( smartFFT # + keep
)( iTup ) # feeding them with
for iTup # just iTup tuple
in iTuples#
) # just n_jobs ~ 4 times
#
| # CRITICAL SECTION
results3 = Parallel( n_jobs = 4 # 1.spawn 4 process replicas
)( delayed( smartFFT # + keep
)( iTup ) # feeding them with
for iTup # just iTup tuple
in iTuples#
) # just n_jobs ~ 4 times
#
| # CRITICAL SECTION +0 [kB] DATA-FLOW
pass; toc = time.time()
那么为什么呢?
您只是支付了太多的附加管理费用,根本不可能实现加速,正如阿姆达尔定律所解释的,一旦我们不再闭上眼睛或忘记所有附加成本,这些附加成本是在“扩展”代码上累积的,以便开始工作,至少以某种方式并行(工作的原子性是第二个重要更新古典的公式,不要将其用于反对跨现实世界设备(处理器或处理器网络)的工作包流的本质。
支付的成本比接收的加速更多 - 这是“原因”部分。没关系 - 很多次重复的缺陷 - 只需检查如何经常 )
| # CRITICAL SECTION +0 [kB] DATA-FLOW
pass; toc = time.time()
So WHY?
You simply paid so much on add-on overhead costs, that there is no speed-up possible at all, as Amdahl's Law has explained, once we stop closing eyes from or forgetting about all the add-on costs, accrued on the "extending" the code so as to start working, at least somehow, in parallel ( the atomicity of work being the second important update of the classical formula, not to use it against the nature of the flow of the work-packages across the real-world devices ( processors or networks of processors ) ).
Paying more costs than receiving in speed-up - this is "The WHY" part. Never mind - many times repeated flaw - just can check how often )