大数据集、ProcessPoolExecutor 问题
问题 - ProcessPoolExecutor 没有提高速度。由 tqdm 确认
了解了足够的 Python 知识,可以复制和/或编写一个可以运行的程序。每个文件大约需要 40 秒来加载 -> 过滤 -> 写入。我有大约 6,800 个文件需要处理,并且想要一个更好的版本来使用我所有的处理能力(6 核),我尝试编写该版本(如下)。所述版本产生,但比我原来的功能稍慢:
from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer
decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers
def load_decode_filter(file):
'''`
Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
'''
output_message = []
with open(file, 'rb') as ins:
for bufr_message in generate_bufr_message(
decoder,ins.read()):
input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
mask = [obj for obj in input_list if ((PHI_MAX > obj[
12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
output_message.extend(mask)
return output_message
def main(files_in):
'''
attempt to intiate all cores in loading and filter bufr files
'''
with ProcessPoolExecutor(max_workers=6) as executor:
with tqdm(range(len(files_in)), desc='files loaded',
position=0) as progress:
futures = []
for file in files_in:
future = executor.submit(load_decode_filter(file), file)
future.add_done_callback(lambda p: progress.update())
futures.append(future)
results = []
for future in futures:
result = future.result()
results.append(result)
with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
dump(results, f_o)
if __name__ == '__main__':
main(files)
我希望至少减少每个文件的处理时间。
更新,结束:
首先,我要感谢所有发表评论的人和回答者(我太新了,无法投票)。似乎有意义地提高效率的唯一方法是从一开始就不要解码并从原位缓冲区数据中获取我想要的东西,这根本超出了我目前的能力(这是我第一次接触任何类型的代码)。
我计划(目前)尽我所能运行我的初始版本(f.bufr 输入,f.bufr_.txt 输出),我将在每次“运行”后将处理后的文件移动到子目录。一线希望是我已经学到了足够多的知识,我将能够编写一个程序将所有文本输出合并到一个文件中。再次感谢。
PROBLEM - ProcessPoolExecutor hasn't increased speed. Confirmed by tqdm
Learned enough about python to copy and/or write a program that works. each file takes ~40 seconds to load->filter->write. I have ~6,800 files to work through and want a better version which uses all my processing power (6 cores), I tried to write that version (below). Said version produces, however slightly slower than my original function:
from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer
decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers
def load_decode_filter(file):
'''`
Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
'''
output_message = []
with open(file, 'rb') as ins:
for bufr_message in generate_bufr_message(
decoder,ins.read()):
input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
mask = [obj for obj in input_list if ((PHI_MAX > obj[
12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
output_message.extend(mask)
return output_message
def main(files_in):
'''
attempt to intiate all cores in loading and filter bufr files
'''
with ProcessPoolExecutor(max_workers=6) as executor:
with tqdm(range(len(files_in)), desc='files loaded',
position=0) as progress:
futures = []
for file in files_in:
future = executor.submit(load_decode_filter(file), file)
future.add_done_callback(lambda p: progress.update())
futures.append(future)
results = []
for future in futures:
result = future.result()
results.append(result)
with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
dump(results, f_o)
if __name__ == '__main__':
main(files)
I was hoping to at least cut processing time per file.
Update, Closing:
First of all, I'd like to thank everyone who commented as well as the answerer (I'm too new to upvote). Seems like the only way to meaningfully increase efficiency would be to never decode in the first place and take what I want from in-situ bufr data, this is simply beyond my current ability (it is my first exposure to code of any kind).
I plan to (am currently) running my initial version (f.bufr in, f.bufr_.txt out) as I am able, I'll move processed files to subdirectory after each "run". Silver lining is I've learned enough doing this that I'll be able to make a program to combine all text output into one file. Thanks again.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
答:< br>不,
恕我直言,
您的主要问题不是
ProcessPoolExecutor()
实例的效率,而是您的主要问题是选择性能/效率(几乎) 反模式,Python,Windows 操作系统领域中的更多 Python 子进程将严重惩罚你,让你等待大约 75 个小时收集所有结果(如果处理管道确实按照您期望的方式进行,我无法判断,但猜测它不会...出于下面列出的原因)
怀疑#1:
最好避免 75产生无意义输出的时间:
鉴于记录的标准 Py3
concurrent.futures.Executor()
-instance.submit()
-method 的调用签名,您的代码不符合此规范。作为调用方的
main()
不是传递对函数的引用,而是首先对 6800 个文件中的每一个执行完整的、纯的[SERIAL]
METOP 工作包处理(产生一些昂贵的收集的巨大消息列表),然后(与传递对函数/就地 lambda 运算符的引用的记录要求相反)再次非常巨大RAM/CPU/TIME 费用,SER/sent/DES 传输到 Executor 管理的工作进程池之一(我怀疑它在收到列表后能够做任何合理的事情,而不是一个函数(计划在这样的远程进程中执行,根据调用签名指定的参数)。哎哟...怀疑#2:
更好地避免任何性能-有辱人格的语法构造函数,
如果性能是要实现的真正目标:
避免键入一种容易实现的 SLOC-s 的任何和所有罪恶,这看起来“性感”,但已经得到了报酬巨大的附加管理费用。
设计流程,以便我们可以通过延迟屏蔽来提高端到端处理时间,在可能的情况下(文件 I/O 是一个经典案例)并避免任何可简化的步骤(创建命名变量(有时永远不会使用 ) 与罪 ) 类似。
假设您在 Windows 操作系统内部运行,您的(虽然是隐藏的)子进程实例化成本是所有其他情况中最高的 - Windows 操作系统将生成 Python 解释器进程的完整自上而下的副本,其中所有数据结构等,因此,如果这导致您的物理 RAM 变得“过度拥挤”,操作系统将开始(在剩下的 75 小时内……)一场令人讨厌的殴打战争虚拟内存管理的文件 I/O 传输 (大约 10.000 倍的延迟) 从 RAM 到磁盘&从磁盘到 RAM。这将有效地损害任何其他从 CPU 到 RAM 的 I/O 操作,我们可能会直接忘记任何关于提高性能的梦想。
根据
pybufrkit
的承诺,如果您的“过滤器”可以使用pybufrkit
-templates 进行编译,那么还有一次机会 - 获得 10% ~ 30% 的性能提升:原样,熵减少的代码:
性能提示,如果既没有设法使用
pybufrkit
本机编译模板,也没有使用pybufrkit
的本机脚本 CLI 任务并诉诸 Win/Py3 处理流程:考虑到主 Python 解释器进程的完整自上而下副本的无论如何支付的成本,您的工作人员将“知道”所有文件的列表,因此这是令人尴尬的独立逐个文件的过程将尽力:
gc.collect(); gc.disable()
在生成任何工作池之前生成与主机硬件上存在的 CPU-RAM 物理内存 I/O 通道一样少的
max_workers
个工作进程(任务受内存限制,而不是 CPU )在
main()
- 要处理的文件列表 - 使用max_workers
- 许多、平衡长度、不重叠的元组( from_fileIDX, to_fileIDX)
executor.submit()
块处理函数引用,具有单个元组( from_, to_ )
并将所有其余部分安排在此类块处理函数中,包括结果的延迟屏蔽文件 I/O 存储(稍后可以使用 O/S 文本/二进制文件合并来合并)更喜欢延迟屏蔽流,使用语法糖(ed)迭代器可能会很好教科书上的例子,但这里这些是(不可屏蔽的)性能杀手 - 收集一个巨大的
[ obj for obj in ... if ... ]
列表永远不会像流一样改进(可屏蔽延迟)流程,无需首先收集这样一个巨大的列表,只是接下来(重新)迭代这样一个巨大的列表,将这样的列表的项目逐一文件 I/O 到磁盘文件上。在一个单一的步骤流中更好地迭代/过滤/有条件地执行文件 I/O 操作(减少 RAM、避免附加开销以及所有可屏蔽延迟)有关更多详细信息,您可能想阅读 此 和来自 这个和那里的直接示例。
A :
No,
with all respect,
your main problem is not the efficiency of
ProcessPoolExecutor()
-instance, butyour main problem is choosing performance / efficiency ( almost ) anti-patterns, which Python, the more Python-sub-processes in realms of Windows O/S will awfully punish with having you to wait for some 75 hours to collect all results (if the processing-pipeline does indeed what you expect it to do, which I cannot judge, but guess it will not ... for reasons listed below )
SUSPECT #1 :
best avoid 75 hours of producing nonsensical outputs :
Given the documented standard Py3
concurrent.futures.Executor()
-instance.submit()
-method's call-signature, your code does not meet this specification.Instead of passing a reference to a function, the
main()
, being a calling-side, first performs for each and every of 6800 files a full, pure-[SERIAL]
METOP-workpackage processing ( which produces some expensively collected huge list-of-messages ), which is then ( to the contrary of the documented requirement to pass a reference to a function / in-place lambda-operator ) again at awfully immense RAM/CPU/TIME expenses, SER/sent/DES-transferred to one of theExecutor
-managed pool of worker-processes ( which I doubt will be able to do anything reasonable upon receiving a list, instead of a function ( planned to be executed in such a remote process, over parameters delivered thereto - as per the calling-signature specifies ). Ouch...SUSPECT #2 :
better avoid any & all performance-degrading syntax-constructors,
if performance is the real goal to be achieved :
Avoid any and all sins of typing a kind of low-hanging-fruits SLOC-s, which seem "sexy", but having been paid by immense add-on overhead costs.
Design process-flow such that we may improve End-to-End processing times by latency-masking, where possible ( file-I/O being a classical case ) and avoiding any reducible steps at all ( creation of named-variables (sometimes never used ) is similar sin ).
Given you run inside Windows O/S, your ( tho' hidden ) sub-process-instantiation costs are the highest of all other cases - Windows O/S will be spawning a full top-down copy of the Python interpreter-process, with all data-structures et al, so if that causes your physical RAM to get "over-crowded", the O/S will start ( for the rest of those 75 hours ... ) a nasty war of thrashing Virtual-Memory-managed file-I/O-transfers ( ~ 10.000x bigger latency ) from-RAM-to-disk & from-disk-to-RAM. That will efficiently damage any other CPU-from/to-RAM I/O-operations and we may straight forget any dreams about increasing performance.
From
pybufrkit
promises, there is one more chance - getting 10% ~ 30% performance boost - if your "filter" is compilable usingpybufrkit
-templates :As-was, entropy-reduced code :
Performance tips, if neither managed to use the
pybufrkit
native compiled-templates nor native-scripted CLI tasking ofpybufrkit
and resort to Win/Py3 flow of processing :given the anyway paid costs of full top-bottom copies of main-Python interpreter process, your workers shall "know" the list-of-all-files, so this embarrasingly independent file-by-file process will do best to :
gc.collect(); gc.disable()
before spawning any pool of workersspawn as few
max_workers
worker-processes as CPU-RAM physical memory-I/O-channels are present on your host hardware ( the tasks are memory-bound, not CPU )split, on the
main()
-side the list-of-files to process - usingmax_workers
-many, balanced-length, non-overlapping tuples of( from_fileIDX, to_fileIDX )
executor.submit()
a block-processing function-reference, with a single tuple of( from_, to_ )
and arrange all the rest inside such block-processing function, including the latency-masked file-I/O storage of results ( possible to later merge, using O/S text/binary-file merging )prefer latency-masking flows, using syntax-sugar(ed) iterators might be nice in school-book examples, but here these are ( un-maskable ) performance killers - collecting a huge-list of
[ obj for obj in ... if ... ]
is never to improve stream-alike ( maskable latency ) process-flow, without first collecting such a huge-list, just to next (re)-iterate such a huge-list to file-I/O such list's items one by one onto disk-file. Better iterate/filter/conditionally execute file-I/O-ops in one, single stream-of-steps ( reducing RAM, avoiding add-on overheads & all that with maskable latencies )For more details you may like to read this and code from this and there directed examples.