为什么 ProcessPoolExecutor 一直运行

发布于 2025-01-10 15:30:11 字数 736 浏览 0 评论 0原文

我尝试使用 python ProcessPoolExecutor 来计算一些 FFT 并行,请参见以下代码:

import concurrent.futures
import numpy as np
from scipy.fft import fft

def fuc(sig):
    C = fft(sig,axis=-1) 
    return C

def main()
    P, M, K = 20, 30, 1024
    FKP = np.array([P,M,K],dtype='cdouble')
    fkp = np.array([P,M,K],dtype='float32')
    fkp = np.random.rand(P,M,K)
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as ex:
        results = ex.map(fuc,(fkp[p,m].reshape(1,K) for p in range(P) for m in range(M)))
    FKP = list(results)

if __name__ == '__main__':
    main()

问题:

  1. 为什么内核一直很忙,但我没有在 Windows 任务管理器中看到 4 个工作进程?
  2. 我是否使用正确的方法在“FKP = list(results)”行中获得并行计算结果?

I try to use python ProcessPoolExecutor to calculate some FFT parallel, see following code:

import concurrent.futures
import numpy as np
from scipy.fft import fft

def fuc(sig):
    C = fft(sig,axis=-1) 
    return C

def main()
    P, M, K = 20, 30, 1024
    FKP = np.array([P,M,K],dtype='cdouble')
    fkp = np.array([P,M,K],dtype='float32')
    fkp = np.random.rand(P,M,K)
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as ex:
        results = ex.map(fuc,(fkp[p,m].reshape(1,K) for p in range(P) for m in range(M)))
    FKP = list(results)

if __name__ == '__main__':
    main()

questions:

  1. why the kernel keeps busy, but I did not see 4 workers from windows task manager?
  2. do I use the right way to get parallel calculated results in line "FKP = list(results)"?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

花开柳相依 2025-01-17 15:30:11

Q1 :
“为什么内核一直忙碌,但我在 Windows 任务管理器中没有看到 4 个工作进程?”

A1 :
让我们用代码本身来解决这个问题:

import os
import time
...
def fuc( sig ):
    print( ( "INF[{0:}]: fuc() starts   "
           + "running in process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )
    C = fft( sig, axis = -1 )
    print( ( "INF[{0:}]: fuc() FFT done "
           + "running in process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )
    return C

该代码将自行记录实际计算计划的 FFT 部分的时间、内容和时间。


Q2:
“我是否使用正确的方法在“FKP = list(results)”行中获得并行计算结果?”

A2:
是的,但是每个 SER/COMMS/DES 进程到进程边界的跨越都会产生一系列显着的附加开销成本,其中所有数据都进行 SER/DES 编码 ( pickle.dumps() -[TIME]- + [SPACE]-域中的 CPU/RAM 成本 + 非零 ipc-p2p-传输时间 ) :

def Pinf():
    print( ( "NEW[{0:}]: ProcessPoolExecutor process-pool has "
           + "started process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )

def main():
    ...
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    print( ( "INF[{0:}]: context-manager"
           + 30*"_" + " entry point"
             ).format( time.get_perf_ns()
           )
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    with concurrent.futures.ProcessPoolExecutor( max_workers = 4,
                                                 initializer = Pinf
                                                 ) as ex:
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        print( ( "INF[{0:}]: context-manager"
               + " is to start .map()"
                 ).format( time.get_perf_ns()
               )
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        results = ex.map( fuc,
                          ( fkp[p,m].reshape( 1, K )
                            for p   in range( P )
                            for   m in range( M )
                            )
                          )
        ...
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        print( ( "INF[{0:}]: context-manager"
               + " .map() returned / __main__ has received all <_results_>"
                 ).format( time.get_perf_ns()
               )
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        pass
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    print( ( "INF[{0:}]: context-manager"
           + 30*"_" + " exited"
             ).format( time.get_perf_ns()
           )
    ...
    print( type( results ) )
    ...

对于每个域的实际附加成本进程池进程实例化,请参阅报告的 ns-traces。详细信息是特定于平台的,如 { MacOS | Linux | Windows }-产生新进程的方法有很大不同。这同样适用于 Python 版本,因为较新的 Py3 版本在调用 Python 解释器进程复制方面做得很好,这与 Py2 和早期版本的 Py3.x 中常见的情况不同 - 有些复制调用 Python 的整个有状态副本-解释器,具有数据、文件描述符等的完整副本 - 由于用于存储调用的 n 个副本的所有相关 RAM 分配,因此承受更大的进程实例化成本 Python解释器。

考虑到规模的扩大:

>>> len( [ ( p, m ) for p in range( P ) for m in range( M ) ] )
600

效率很重要。仅将带有子范围索引的一个元组(p_start,p_end,m_start,m_end)传递给4个进程,其中应进行信号部分的FFT处理并返回其FFT结果的子列表,将避免传递小块中多次相同的静态数据,完全避免 596x 传递(CPU-RAM-和延迟方面)昂贵的 SER/COMMS/DES-SED/COMMS/DES ipc-p2p 数据传输走廊根本没有。

有关更多详细信息,您可能想重新阅读这个

Q1 :
" why the kernel keeps busy, but I did not see 4 workers from windows task manager? "

A1 :
Let's solve this in code itself :

import os
import time
...
def fuc( sig ):
    print( ( "INF[{0:}]: fuc() starts   "
           + "running in process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )
    C = fft( sig, axis = -1 )
    print( ( "INF[{0:}]: fuc() FFT done "
           + "running in process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )
    return C

This code will self-document, when, what, how long actually computes the FFT-part of the plan.


Q2 :
" do I use the right way to get parallel calculated results in line "FKP = list(results)"? "

A2 :
Yes, yet at a set of remarkable add-on overhead costs for each SER/COMMS/DES process-to-process border-crossing, where all data gets SER/DES coded ( pickle.dumps()-alike CPU/RAM costs in [TIME]- + [SPACE]-Domains + nonzero ipc-p2p-transfer times ) :

def Pinf():
    print( ( "NEW[{0:}]: ProcessPoolExecutor process-pool has "
           + "started process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )

def main():
    ...
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    print( ( "INF[{0:}]: context-manager"
           + 30*"_" + " entry point"
             ).format( time.get_perf_ns()
           )
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    with concurrent.futures.ProcessPoolExecutor( max_workers = 4,
                                                 initializer = Pinf
                                                 ) as ex:
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        print( ( "INF[{0:}]: context-manager"
               + " is to start .map()"
                 ).format( time.get_perf_ns()
               )
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        results = ex.map( fuc,
                          ( fkp[p,m].reshape( 1, K )
                            for p   in range( P )
                            for   m in range( M )
                            )
                          )
        ...
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        print( ( "INF[{0:}]: context-manager"
               + " .map() returned / __main__ has received all <_results_>"
                 ).format( time.get_perf_ns()
               )
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        pass
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    print( ( "INF[{0:}]: context-manager"
           + 30*"_" + " exited"
             ).format( time.get_perf_ns()
           )
    ...
    print( type( results ) )
    ...

For the actual add-on costs of each process-pool process instantiation, see the reported ns-traces. Details are platform specific as { MacOS | Linux | Windows }-methods for spawning new processes differ a lot. The same is valid for Python-versions, as more recent Py3 versions do well different scope of calling Python-interpreter process copying, than was common in Py2 and earlier versions of Py3.x - some copying the whole, stateful copy of the calling Python-interpreter, with its complete replica of data, file-descriptors and likes - bearing thus even larger process-instantiation costs, due to all associated RAM-allocations for storing the n-many replicas of the calling Python-interpreter.

Given the scaling :

>>> len( [ ( p, m ) for p in range( P ) for m in range( M ) ] )
600

efficiency matters. Passing just one tuple of ( p_start, p_end, m_start, m_end ) with indices of sub-ranges to 4 processes, where the FFT-processing of signal-sections shall take place and return sub-lists of FFT-results thereof, will avoid passing the same, static data many times in small chunks and completely avoid 596x passing the ( CPU- RAM- and latency-wise ) expensive SER/COMMS/DES-SED/COMMS/DES ipc-p2p DATA-passing corridor at all.

For more details you may like to re-read this and this.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文