为什么使用 joblib.Parallel() 时我的代码运行速度比不使用时慢得多?

发布于 2025-01-13 06:17:12 字数 3332 浏览 6 评论 0原文

我刚开始使用 joblib.Parallel() 来加速一些大规模的 numpy.fft 计算。

我遵循这个 示例joblib-web 上展示

使用该示例,我可以在我的计算机上看到以下结果:

Elapsed time computing the average of couple of slices 1.69 s
Elapsed time computing the average of couple of slices 2.64 s
Elapsed time computing the average of couple of slices 0.40 s
Elapsed time computing the average of couple of slices 0.26 s 

它们看起来非常好!然后我将 data[s1].mean() 更改为 np.fft.fft( data[s1] ),请参阅以下代码:

    import numpy as np

    data = np.random.random((int(2**24),))
    window_size = int(256)
    slices = [slice(start, start + window_size)
          for start in range(0, data.size - window_size, window_size)]
    len(slices)

    import time

    def slow_FFT(data, sl):
        return np.fft.fft(data[sl])

    tic = time.time()
    results = [slow_FFT(data, sl) for sl in slices]
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))
    np.shape(results)


    from joblib import Parallel, delayed

    tic = time.time()
    results2 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))


    import os
    from joblib import dump, load, Parallel

    folder = './joblib5_memmap'
    try:
        os.mkdir(folder)
    except FileExistsError:
        pass
    data_filename_memmap = os.path.join(folder, 'data_memmap')
    dump(data, data_filename_memmap)
    data = load(data_filename_memmap, mmap_mode='r')

    tic = time.time()
    results3 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
      .format(toc - tic))

    def slow_FFT_write_output(data, sl, output, idx):
        res_ = np.fft.fft(data[sl])
        output[idx,:] = res_

    output_filename_memmap = os.path.join(folder, 'output_memmap')
    output = np.memmap(output_filename_memmap, dtype=np.cdouble,shape= 
       (len(slices),window_size), mode='w+')
    data = load(data_filename_memmap, mmap_mode='r')

    tic = time.time()
    _ = Parallel(n_jobs=4)(delayed(slow_FFT_write_output)
        (data, sl, output, idx) for idx, sl in enumerate(slices))
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
      .format(toc - tic))

    print(np.allclose(np.array(results),output))

我没有看到 4 核的加速在“共享内存的可写memmap”中

,首先,我们将评估我们问题上的顺序计算:

Elapsed time computing the average of couple of slices 0.62 s

joblib.Parallel()用于使用4个worker并行计算所有切片的平均值:

Elapsed time computing the average of couple of slices 4.29 s

并行处理是已经比顺序处理快了。还可以通过将数据数组转储到内存映射并将内存映射传递给 joblib.Parallel() 来消除一些开销:

Elapsed time computing the average of couple of slices 1.94 s

共享内存的可写内存映射:

Elapsed time computing the average of couple of slices 1.46 s
True

有人可以帮我“为什么”吗?非常感谢!

I am new to use joblib.Parallel() to speed up some massive numpy.fft calculations.

I follow this example presented on joblib-web

Using the example, I can see following result on my computer:

Elapsed time computing the average of couple of slices 1.69 s
Elapsed time computing the average of couple of slices 2.64 s
Elapsed time computing the average of couple of slices 0.40 s
Elapsed time computing the average of couple of slices 0.26 s 

They look very good! Then I change data[s1].mean() to np.fft.fft( data[s1] ), see following code:

    import numpy as np

    data = np.random.random((int(2**24),))
    window_size = int(256)
    slices = [slice(start, start + window_size)
          for start in range(0, data.size - window_size, window_size)]
    len(slices)

    import time

    def slow_FFT(data, sl):
        return np.fft.fft(data[sl])

    tic = time.time()
    results = [slow_FFT(data, sl) for sl in slices]
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))
    np.shape(results)


    from joblib import Parallel, delayed

    tic = time.time()
    results2 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))


    import os
    from joblib import dump, load, Parallel

    folder = './joblib5_memmap'
    try:
        os.mkdir(folder)
    except FileExistsError:
        pass
    data_filename_memmap = os.path.join(folder, 'data_memmap')
    dump(data, data_filename_memmap)
    data = load(data_filename_memmap, mmap_mode='r')

    tic = time.time()
    results3 = Parallel(n_jobs=4)(delayed(slow_FFT)(data, sl) for sl in slices)
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
      .format(toc - tic))

    def slow_FFT_write_output(data, sl, output, idx):
        res_ = np.fft.fft(data[sl])
        output[idx,:] = res_

    output_filename_memmap = os.path.join(folder, 'output_memmap')
    output = np.memmap(output_filename_memmap, dtype=np.cdouble,shape= 
       (len(slices),window_size), mode='w+')
    data = load(data_filename_memmap, mmap_mode='r')

    tic = time.time()
    _ = Parallel(n_jobs=4)(delayed(slow_FFT_write_output)
        (data, sl, output, idx) for idx, sl in enumerate(slices))
    toc = time.time()
    print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
      .format(toc - tic))

    print(np.allclose(np.array(results),output))

I do not see speedup with 4 cores in "Writable memmap for shared memory"

First, we will evaluate the sequential computing on our problem:

Elapsed time computing the average of couple of slices 0.62 s

joblib.Parallel() is used to compute in parallel the average of all slices using 4 workers:

Elapsed time computing the average of couple of slices 4.29 s

Parallel processing is already faster than the sequential processing. It is also possible to remove a bit of overhead by dumping the data array to a memmap and pass the memmap to joblib.Parallel():

Elapsed time computing the average of couple of slices 1.94 s

Writable memmap for shared memory:

Elapsed time computing the average of couple of slices 1.46 s
True

Can someone help me "why"? Many thanks in advance!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(17

不忘初心 2025-01-20 06:17:12

问:
“有人可以帮我“为什么”吗?”

答:
当然,您的代码已经获得了巨大的附加开销成本,并且它不断重复收集65536 x(很多次!!):
SER / xfer / DES附加成本( [SPACE]-wise 作为 RAM 分配 + [TIME]-wise CPU + RAM-I/O 延迟)
一次又一次地序列化 + 传输 p2p + 反序列化一块 1.1 [GB] 的相同数据 RAM 到 RAM

pass;                                        tic = time.time()
#

Q :
" Can someone help me "why"? "

A :
Sure, your code has acquired IMMENSE add-on overhead-costs, and it keeps repeating collecting 65536 x ( that many times!!) the :
SER / xfer / DES add-on costs ( [SPACE]-wise as RAM allocations + [TIME]-wise CPU + RAM-I/O delays )
to again and again serialise + transfer p2p + deserialise a block of 1.1 [GB] of the same data RAM to RAM

pass;                                        tic = time.time()
#

眸中客 2025-01-20 06:17:12
九公里浅绿 2025-01-20 06:17:12

| # CRITICAL SECTION
results3 = Parallel( n_jobs = 4 # 1.spawn 4 process replicas
)( delayed( slow_FFT # + keep
)( data, # feeding them with
sl ) # <_1.1_GB_data_> + <_sl_>-Objects
for sl # for each slice
in slices # from slices
) # again and again 65k+ times
#

| # CRITICAL SECTION
results3 = Parallel( n_jobs = 4 # 1.spawn 4 process replicas
)( delayed( slow_FFT # + keep
)( data, # feeding them with
sl ) # <_1.1_GB_data_> + <_sl_>-Objects
for sl # for each slice
in slices # from slices
) # again and again 65k+ times
#

梦亿 2025-01-20 06:17:12
无需解释 2025-01-20 06:17:12

| # CRITICAL SECTION +72 [TB] DATA-FLOW RAM-I/O PAIN
pass; toc = time.time()

这种使用迭代器语法糖的“低成本”SLOC 会因为做大量非生产性工作而受到惩罚,因此没有做唯一有用的工作。

重构策略,仅支付一次 SER/xfer/DES 附加成本(在 n_jobs 进程实例化期间,无论如何都会完成)
并且从不传递数据< /code>,在所有复制的 Python 解释器进程中已经“已知”。最好制定临时迭代器,以便在大块上自主地在“远程”工作人员内部工作,通过智能调用签名定义,仅被调用一次
(而不是65536 x

def smartFFT( aTupleOfStartStopShiftINDEX = ( 0, -FFT_WINDOW_SIZE, 1 ) ):
    global FFT_WINDOW_SIZE
    global DATA_IN
    #------------------------
    # compute all FFT-results
    #         for "known" DATA_IN,
    #         for each block from aTupleOfStartStopShiftINDEX[0]
    #                        till aTupleOfStartStopShiftINDEX[1]
    #                 shifting by aTupleOfStartStopShiftINDEX[2]
    #                     of size FFT_WINDOW_SIZE
    #------------------------prefer powers of Numpy vectorized code
    #------------------------best with using smart-striding-tricks
    return block_of_RESULTS_at_once

接下来就是:

pass;                                        tic = time.time()
#

| # CRITICAL SECTION +72 [TB] DATA-FLOW RAM-I/O PAIN
pass; toc = time.time()

This "low-cost" SLOC with using an iterator syntax sugar is punished by doing awfully lot unproductive work, thus not doing the only useful one.

Refactor the strategy, to pay SER/xfer/DES add-on costs just once (during the instantiation of the n_jobs-processes, which is done anyway)
and never pass data, that are already "known" inside all the n_jobs copied Python Interpreter processes. Best formulate ad-hoc iterators to work inside the "remote" workers autonomously on large blocks, defined via the smart call-signature, being called just once
( and not as many as 65536 x )

def smartFFT( aTupleOfStartStopShiftINDEX = ( 0, -FFT_WINDOW_SIZE, 1 ) ):
    global FFT_WINDOW_SIZE
    global DATA_IN
    #------------------------
    # compute all FFT-results
    #         for "known" DATA_IN,
    #         for each block from aTupleOfStartStopShiftINDEX[0]
    #                        till aTupleOfStartStopShiftINDEX[1]
    #                 shifting by aTupleOfStartStopShiftINDEX[2]
    #                     of size FFT_WINDOW_SIZE
    #------------------------prefer powers of Numpy vectorized code
    #------------------------best with using smart-striding-tricks
    return block_of_RESULTS_at_once

and next just :

pass;                                        tic = time.time()
#

七婞 2025-01-20 06:17:12
忆离笙 2025-01-20 06:17:12

| # CRITICAL SECTION
results3 = Parallel( n_jobs = 4 # 1.spawn 4 process replicas
)( delayed( smartFFT # + keep
)( iTup ) # feeding them with
for iTup # just iTup tuple
in iTuples#
) # just n_jobs ~ 4 times
#

| # CRITICAL SECTION
results3 = Parallel( n_jobs = 4 # 1.spawn 4 process replicas
)( delayed( smartFFT # + keep
)( iTup ) # feeding them with
for iTup # just iTup tuple
in iTuples#
) # just n_jobs ~ 4 times
#

So要识趣 2025-01-20 06:17:12
埖埖迣鎅 2025-01-20 06:17:12

| # CRITICAL SECTION +0 [kB] DATA-FLOW
pass; toc = time.time()


那么为什么呢?

您只是支付了太多的附加管理费用,根本不可能实现加速,正如阿姆达尔定律所解释的,一旦我们不再闭上眼睛或忘记所有附加成本,这些附加成本是在“扩展”代码上累积的,以便开始工作,至少以某种方式并行(工作的原子性是第二个重要更新古典的公式,不要将其用于反对跨现实世界设备(处理器或处理器网络)的工作包流的本质。

支付的成本比接收的加速更多 - 这是“原因”部分。没关系 - 很多次重复的缺陷 - 只需检查如何经常 )

| # CRITICAL SECTION +0 [kB] DATA-FLOW
pass; toc = time.time()


So WHY?

You simply paid so much on add-on overhead costs, that there is no speed-up possible at all, as Amdahl's Law has explained, once we stop closing eyes from or forgetting about all the add-on costs, accrued on the "extending" the code so as to start working, at least somehow, in parallel ( the atomicity of work being the second important update of the classical formula, not to use it against the nature of the flow of the work-packages across the real-world devices ( processors or networks of processors ) ).

Paying more costs than receiving in speed-up - this is "The WHY" part. Never mind - many times repeated flaw - just can check how often )

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文