大数据集、ProcessPoolExecutor 问题

发布于 2025-01-10 22:47:35 字数 2305 浏览 2 评论 0原文

问题 - ProcessPoolExecutor 没有提高速度。由 tqdm 确认

了解了足够的 Python 知识,可以复制和/或编写一个可以运行的程序。每个文件大约需要 40 秒来加载 -> 过滤 -> 写入。我有大约 6,800 个文件需要处理,并且想要一个更好的版本来使用我所有的处理能力(6 核),我尝试编写该版本(如下)。所述版本产生,但比我原来的功能稍慢:

from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer

decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers

def load_decode_filter(file):
    '''`
     Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open(file, 'rb') as ins:
        for bufr_message in generate_bufr_message(
                decoder,ins.read()):
            input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
            mask = [obj for obj in input_list if ((PHI_MAX > obj[
                12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
            output_message.extend(mask)
        return output_message

def main(files_in):
    '''
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor(max_workers=6) as executor:
        with tqdm(range(len(files_in)), desc='files loaded',
                  position=0) as progress:
            futures = []
            for file in files_in:
                future = executor.submit(load_decode_filter(file), file)
                future.add_done_callback(lambda p: progress.update())
                futures.append(future)
            results = []
            for future in futures:
                result = future.result()
                results.append(result)
    with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
        dump(results, f_o)

if __name__ == '__main__':
    main(files)

我希望至少减少每个文件的处理时间。


更新,结束:
首先,我要感谢所有发表评论的人和回答者(我太新了,无法投票)。似乎有意义地提高效率的唯一方法是从一开始就不要解码并从原位缓冲区数据中获取我想要的东西,这根本超出了我目前的能力(这是我第一次接触任何类型的代码)。


我计划(目前)尽我所能运行我的初始版本(f.bufr 输入,f.bufr_.txt 输出),我将在每次“运行”后将处理后的文件移动到子目录。一线希望是我已经学到了足够多的知识,我将能够编写一个程序将所有文本输出合并到一个文件中。再次感谢。

PROBLEM - ProcessPoolExecutor hasn't increased speed. Confirmed by tqdm

Learned enough about python to copy and/or write a program that works. each file takes ~40 seconds to load->filter->write. I have ~6,800 files to work through and want a better version which uses all my processing power (6 cores), I tried to write that version (below). Said version produces, however slightly slower than my original function:

from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer

decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers

def load_decode_filter(file):
    '''`
     Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open(file, 'rb') as ins:
        for bufr_message in generate_bufr_message(
                decoder,ins.read()):
            input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
            mask = [obj for obj in input_list if ((PHI_MAX > obj[
                12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
            output_message.extend(mask)
        return output_message

def main(files_in):
    '''
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor(max_workers=6) as executor:
        with tqdm(range(len(files_in)), desc='files loaded',
                  position=0) as progress:
            futures = []
            for file in files_in:
                future = executor.submit(load_decode_filter(file), file)
                future.add_done_callback(lambda p: progress.update())
                futures.append(future)
            results = []
            for future in futures:
                result = future.result()
                results.append(result)
    with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
        dump(results, f_o)

if __name__ == '__main__':
    main(files)

I was hoping to at least cut processing time per file.


Update, Closing:
First of all, I'd like to thank everyone who commented as well as the answerer (I'm too new to upvote). Seems like the only way to meaningfully increase efficiency would be to never decode in the first place and take what I want from in-situ bufr data, this is simply beyond my current ability (it is my first exposure to code of any kind).

I plan to (am currently) running my initial version (f.bufr in, f.bufr_.txt out) as I am able, I'll move processed files to subdirectory after each "run". Silver lining is I've learned enough doing this that I'll be able to make a program to combine all text output into one file. Thanks again.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

烟酒忠诚 2025-01-17 22:47:35

问:
“问题 - ProcessPoolExecutor 没有提高速度。已由 tqdm 确认”

答:< br>不,
恕我直言,
您的主要问题不是 ProcessPoolExecutor() 实例的效率,而是
您的主要问题是选择性能/效率(几乎) 反模式,Python,Windows 操作系统领域中的更多 Python 子进程将严重惩罚你,让你等待大约 75 个小时收集所有结果(如果处理管道确实按照您期望的方式进行,我无法判断,但猜测它不会...出于下面列出的原因)

怀疑#1:
最好避免 75产生无意义输出的时间:

鉴于记录的标准 Py3 concurrent.futures.Executor()-instance .submit()-method 的调用签名,您的代码不符合此规范。

作为调用方的 main() 不是传递对函数的引用,而是首先对 6800 个文件中的每一个执行完整的、纯的[SERIAL] METOP 工作包处理(产生一些昂贵的收集的巨大消息列表),然后(与传递对函数/就地 lambda 运算符的引用的记录要求相反)再次非常巨大RAM/CPU/TIME 费用,SER/sent/DES 传输到 Executor 管理的工作进程池之一(我怀疑它在收到列表后能够做任何合理的事情,而不是一个函数(计划在这样的远程进程中执行,根据调用签名指定的参数)。哎哟...

def main( files_in ):
    '''                                                                 __doc__
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor( max_workers = 6
                              )  as executor: #---------------------------# eXe CONTEXT-mgr
        
        with tqdm( range( len( files_in ) ),
                   desc     = 'files loaded',
                   position = 0
                   ) as progress: #---------------------------------------# pro CONTEXT-in-CONTEXT-mgr
            
            futures = []
            for file in files_in: #---------------------------------------#     LUXURY of top-level iterator, commanding  6800x times a pool of workers
                future = executor.submit( load_decode_filter( file ), #---#     ??? WHY CALC THE RESULT BEFORE passing it to the .submit() ???
                                                              file    #---#         std PARA
                                                              )
                future.add_done_callback( lambda p: progress.update() )   #     LUXURY of tdqm() for showing 75-hours of work ???
                futures.append( future ) #--------------------------------#     LUXURY of no performance gain
            
            results = []
            for future in futures:
                result = future.result()
                results.append( result ) #--------------------------------#     LUXURY of adverse performance gain
    
    with open( DIRECTORY + 'bufrout.json', 'w',
               encoding = 'utf-8'
               ) as f_o: #------------------------------------------------# f_o CONTEXT-mgr
        dump( results, f_o )

怀疑#2:
更好地避免任何性能-有辱人格的语法构造函数,
如果性能是要实现的真正目标:

避免键入一种容易实现的 SLOC-s 的任何和所有罪恶,这看起来“性感”,但已经得到了报酬巨大的附加管理费用。

设计流程,以便我们可以通过延迟屏蔽来提高端到端处理时间,在可能的情况下(文件 I/O 是一个经典案例)并避免任何可简化的步骤(创建命名变量(有时永远不会使用 ) 与罪 ) 类似。

假设您在 Windows 操作系统内部运行,您的(虽然是隐藏的)子进程实例化成本是所有其他情况中最高的 - Windows 操作系统将生成 Python 解释器进程的完整自上而下的副本,其中所有数据结构等,因此,如果这导致您的物理 RAM 变得“过度拥挤”,操作系统将开始(在剩下的 75 小时内……)一场令人讨厌的殴打战争虚拟内存管理的文件 I/O 传输 (大约 10.000 倍的延迟) 从 RAM 到磁盘&从磁盘到 RAM。这将有效地损害任何其他从 CPU 到 RAM 的 I/O 操作,我们可能会直接忘记任何关于提高性能的梦想。

根据 pybufrkit 的承诺,如果您的“过滤器”可以使用 pybufrkit-templates 进行编译,那么还有一次机会 - 获得 10% ~ 30% 的性能提升:

"(...) BUFR 模板编译
模板编译的主要目的是性能。但是由于位操作是整体中最耗时的部分性能提升有些有限,具体取决于要处理的消息的描述符总数,模板编译可提供 10 - 30% 的性能提升。 href="https://pybufrkit.readthedocs.io/en/latest/#bufr-template-compilation" rel="nofollow noreferrer">阅读文档 "

原样,熵减少的代码:

def load_decode_filter( file ):
    '''`
    Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open( file, 'rb' ) as ins: #----------------------------------- # ins CONTEXT-mgr
        for idx,         \
            bufr_message  \
            in             \
            enumerate( generate_bufr_message( decoder,                   #     LUXURY of enumerate for no real use
                                              ins.read() # <-------------# ins.
                                              )
                       ):
            input_list = FlatJsonRenderer().render( bufr_message )[3][2] #     LUXURY of JSON-(re)-)decorations
            mask = [ obj for obj in input_list                           #
                                 if ( (    PHI_MAX > obj[12] >    PHI_MIN )
                                    & ( LAMBDA_MAX > obj[13] > LAMBDA_MIN )
                                      )
                     ]
            output_message.extend( mask )
        return output_message

性能提示,如果既没有设法使用 pybufrkit 本机编译模板,也没有使用 pybufrkit 的本机脚本 CLI 任务并诉诸 Win/Py3 处理流程:

  • 考虑到主 Python 解释器进程的完整自上而下副本的无论如何支付的成本,您的工作人员将“知道”所有文件的列表,因此这是令人尴尬的独立逐个文件的过程将尽力:

  • gc.collect(); gc.disable() 在生成任何工作池之前

  • 生成与主机硬件上存在的 CPU-RAM 物理内存 I/O 通道一样少的 max_workers 个工作进程(任务受内存限制,而不是 CPU )

  • main() - 要处理的文件列表 - 使用 max_workers - 许多、平衡长度、不重叠的 元组( from_fileIDX, to_fileIDX)

  • executor.submit() 块处理函数引用,具有单个元组 ( from_, to_ ) 并将所有其余部分安排在此类块处理函数中,包括结果的延迟屏蔽文件 I/O 存储(稍后可以使用 O/S 文本/二进制文件合并来合并)

  • 更喜欢延迟屏蔽流,使用语法糖(ed)迭代器可能会很好教科书上的例子,但这里这些是(不可屏蔽的)性能杀手 - 收集一个巨大的 [ obj for obj in ... if ... ] 列表永远不会像流一样改进(可屏蔽延迟)流程,无需首先收集这样一个巨大的列表,只是接下来(重新)迭代这样一个巨大的列表,将这样的列表的项目逐一文件 I/O 到磁盘文件上。在一个单一的步骤流中更好地迭代/过滤/有条件地执行文件 I/O 操作(减少 RAM、避免附加开销以及所有可屏蔽延迟)

有关更多详细信息,您可能想阅读 和来自 这个和那里的直接示例。

Q :
" PROBLEM - ProcessPoolExecutor hasn't increased speed. Confirmed by tqdm "

A :
No,
with all respect,
your main problem is not the efficiency of ProcessPoolExecutor()-instance, but
your main problem is choosing performance / efficiency ( almost ) anti-patterns, which Python, the more Python-sub-processes in realms of Windows O/S will awfully punish with having you to wait for some 75 hours to collect all results (if the processing-pipeline does indeed what you expect it to do, which I cannot judge, but guess it will not ... for reasons listed below )

SUSPECT #1 :
best avoid 75 hours of producing nonsensical outputs :

Given the documented standard Py3 concurrent.futures.Executor()-instance .submit()-method's call-signature, your code does not meet this specification.

Instead of passing a reference to a function, the main(), being a calling-side, first performs for each and every of 6800 files a full, pure-[SERIAL] METOP-workpackage processing ( which produces some expensively collected huge list-of-messages ), which is then ( to the contrary of the documented requirement to pass a reference to a function / in-place lambda-operator ) again at awfully immense RAM/CPU/TIME expenses, SER/sent/DES-transferred to one of the Executor-managed pool of worker-processes ( which I doubt will be able to do anything reasonable upon receiving a list, instead of a function ( planned to be executed in such a remote process, over parameters delivered thereto - as per the calling-signature specifies ). Ouch...

def main( files_in ):
    '''                                                                 __doc__
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor( max_workers = 6
                              )  as executor: #---------------------------# eXe CONTEXT-mgr
        
        with tqdm( range( len( files_in ) ),
                   desc     = 'files loaded',
                   position = 0
                   ) as progress: #---------------------------------------# pro CONTEXT-in-CONTEXT-mgr
            
            futures = []
            for file in files_in: #---------------------------------------#     LUXURY of top-level iterator, commanding  6800x times a pool of workers
                future = executor.submit( load_decode_filter( file ), #---#     ??? WHY CALC THE RESULT BEFORE passing it to the .submit() ???
                                                              file    #---#         std PARA
                                                              )
                future.add_done_callback( lambda p: progress.update() )   #     LUXURY of tdqm() for showing 75-hours of work ???
                futures.append( future ) #--------------------------------#     LUXURY of no performance gain
            
            results = []
            for future in futures:
                result = future.result()
                results.append( result ) #--------------------------------#     LUXURY of adverse performance gain
    
    with open( DIRECTORY + 'bufrout.json', 'w',
               encoding = 'utf-8'
               ) as f_o: #------------------------------------------------# f_o CONTEXT-mgr
        dump( results, f_o )

SUSPECT #2 :
better avoid any & all performance-degrading syntax-constructors,
if performance is the real goal to be achieved :

Avoid any and all sins of typing a kind of low-hanging-fruits SLOC-s, which seem "sexy", but having been paid by immense add-on overhead costs.

Design process-flow such that we may improve End-to-End processing times by latency-masking, where possible ( file-I/O being a classical case ) and avoiding any reducible steps at all ( creation of named-variables (sometimes never used ) is similar sin ).

Given you run inside Windows O/S, your ( tho' hidden ) sub-process-instantiation costs are the highest of all other cases - Windows O/S will be spawning a full top-down copy of the Python interpreter-process, with all data-structures et al, so if that causes your physical RAM to get "over-crowded", the O/S will start ( for the rest of those 75 hours ... ) a nasty war of thrashing Virtual-Memory-managed file-I/O-transfers ( ~ 10.000x bigger latency ) from-RAM-to-disk & from-disk-to-RAM. That will efficiently damage any other CPU-from/to-RAM I/O-operations and we may straight forget any dreams about increasing performance.

From pybufrkit promises, there is one more chance - getting 10% ~ 30% performance boost - if your "filter" is compilable using pybufrkit-templates :

"(...) BUFR Template Compilation
The main purpose of Template Compilation is performance. However since bit operations are the most time consuming part in the overall processing. The performance gain somewhat is limited. Depending on the total number of descriptors to be processed for a message, template compilation provides 10 - 30% performance boost. Read the Docs "

As-was, entropy-reduced code :

def load_decode_filter( file ):
    '''`
    Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open( file, 'rb' ) as ins: #----------------------------------- # ins CONTEXT-mgr
        for idx,         \
            bufr_message  \
            in             \
            enumerate( generate_bufr_message( decoder,                   #     LUXURY of enumerate for no real use
                                              ins.read() # <-------------# ins.
                                              )
                       ):
            input_list = FlatJsonRenderer().render( bufr_message )[3][2] #     LUXURY of JSON-(re)-)decorations
            mask = [ obj for obj in input_list                           #
                                 if ( (    PHI_MAX > obj[12] >    PHI_MIN )
                                    & ( LAMBDA_MAX > obj[13] > LAMBDA_MIN )
                                      )
                     ]
            output_message.extend( mask )
        return output_message

Performance tips, if neither managed to use the pybufrkit native compiled-templates nor native-scripted CLI tasking of pybufrkit and resort to Win/Py3 flow of processing :

  • given the anyway paid costs of full top-bottom copies of main-Python interpreter process, your workers shall "know" the list-of-all-files, so this embarrasingly independent file-by-file process will do best to :

  • gc.collect(); gc.disable() before spawning any pool of workers

  • spawn as few max_workers worker-processes as CPU-RAM physical memory-I/O-channels are present on your host hardware ( the tasks are memory-bound, not CPU )

  • split, on the main()-side the list-of-files to process - using max_workers-many, balanced-length, non-overlapping tuples of ( from_fileIDX, to_fileIDX )

  • executor.submit() a block-processing function-reference, with a single tuple of ( from_, to_ ) and arrange all the rest inside such block-processing function, including the latency-masked file-I/O storage of results ( possible to later merge, using O/S text/binary-file merging )

  • prefer latency-masking flows, using syntax-sugar(ed) iterators might be nice in school-book examples, but here these are ( un-maskable ) performance killers - collecting a huge-list of [ obj for obj in ... if ... ] is never to improve stream-alike ( maskable latency ) process-flow, without first collecting such a huge-list, just to next (re)-iterate such a huge-list to file-I/O such list's items one by one onto disk-file. Better iterate/filter/conditionally execute file-I/O-ops in one, single stream-of-steps ( reducing RAM, avoiding add-on overheads & all that with maskable latencies )

For more details you may like to read this and code from this and there directed examples.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文