在 Python 中分割大文件以进行多处理的最佳方法是什么?

发布于 2024-08-12 23:43:47 字数 1033 浏览 7 评论 0原文

我遇到了很多“令人尴尬的并行”项目,我想使用 multiprocessing 模块进行并行化。然而,它们通常涉及读取大文件(大于 2GB),逐行处理它们,运行基本计算,然后写入结果。使用 Python 的多处理模块分割文件并处理它的最佳方法是什么?应该在multiprocessing中使用Queue还是JoinableQueue?或者Queue模块本身?或者,我应该使用多处理将文件可迭代地映射到进程池上吗?我已经尝试过这些方法,但逐行分发数据的开销是巨大的。我已经通过使用 cat 文件 | 确定了轻量级管道过滤器设计。 process1 --out-file out1 --num-processes 2 | 进程1 --out-file out1 --num-processes 2 | process2 --out-file out2,它将第一个进程的输入的一定百分比直接传递到第二个输入(请参阅这篇文章),但我希望有一个完全包含在Python中的解决方案。

令人惊讶的是,Python 文档并没有提出执行此操作的规范方法(尽管 multiprocessing 文档中有很长的关于编程指南的部分)。

谢谢, Vince

附加信息:每行的处理时间各不相同。有些问题速度很快并且几乎不受 I/O 限制,有些则受 CPU 限制。 CPU 限制的、非依赖的任务将从并行化中获得优势,这样即使将数据分配给处理函数的低效方式在挂钟时间方面仍然是有益的。

一个主要的例子是一个脚本,它从行中提取字段,检查各种按位标志,并将具有某些标志的行以全新的格式写入新文件。这看起来像是一个 I/O 限制问题,但是当我使用带管道的廉价并发版本运行它时,速度大约快了 20%。当我在多处理中使用池和映射或队列运行它时,它总是慢 100% 以上。

I run across a lot of "embarrassingly parallel" projects I'd like to parallelize with the multiprocessing module. However, they often involve reading in huge files (greater than 2gb), processing them line by line, running basic calculations, and then writing results. What's the best way to split a file and process it using Python's multiprocessing module? Should Queue or JoinableQueue in multiprocessing be used? Or the Queue module itself? Or, should I map the file iterable over a pool of processes using multiprocessing? I've experimented with these approaches but the overhead is immense in distribution the data line by line. I've settled on a lightweight pipe-filters design by using cat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2, which passes a certain percentage of the first process's input directly to the second input (see this post), but I'd like to have a solution contained entirely in Python.

Surprisingly, the Python documentation doesn't suggest a canonical way of doing this (despite a lengthy section on programming guidelines in the multiprocessing documentation).

Thanks,
Vince

Additional information: Processing time per line varies. Some problems are fast and barely not I/O bound, some are CPU-bound. The CPU bound, non-dependent tasks will gain the post from parallelization, such that even inefficient ways of assigning data to a processing function would still be beneficial in terms of wall clock time.

A prime example is a script that extracts fields from lines, checks for a variety of bitwise flags, and writes lines with certain flags to a new file in an entirely new format. This seems like an I/O bound problem, but when I ran it with my cheap concurrent version with pipes, it was about 20% faster. When I run it with pool and map, or queue in multiprocessing it is always over 100% slower.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

情定在深秋 2024-08-19 23:43:47

最好的架构之一已经成为 Linux 操作系统的一部分。不需要特殊的库。

您想要“扇出”设计。

  1. “主”程序创建许多通过管道连接的子进程。

  2. 主程序读取文件,将行写入管道,执行将行处理到适当子进程所需的最小过滤。

    主程序读取文件,将

每个子进程可能应该是从标准输入读取和写入的不同进程的管道。

您不需要队列数据结构,这正是内存中管道——两个并发进程之间的字节队列。

One of the best architectures is already part of Linux OS's. No special libraries required.

You want a "fan-out" design.

  1. A "main" program creates a number of subprocesses connected by pipes.

  2. The main program reads the file, writing lines to the pipes doing the minimum filtering required to deal the lines to appropriate subprocesses.

Each subprocess should probably be a pipeline of distinct processes that read and write from stdin.

You don't need a queue data structure, that's exactly what an in-memory pipeline is -- a queue of bytes between two concurrent processes.

鲜肉鲜肉永远不皱 2024-08-19 23:43:47

一种策略是为每个工作进程分配一个偏移量,因此如果您有 8 个工作进程,则分配编号 0 到 7。工作进程编号 0 读取第一个记录并处理它,然后跳过 7 并继续处理第 8 个记录,依此类推,工作进程编号 1读取第二条记录,然后跳过 7 条记录并处理第 9 条记录......

这种方案有很多优点。无论文件有多大,工作总是平均分配,同一台机器上的进程将以大致相同的速率进行处理,并使用相同的缓冲区,因此不会产生任何过多的 I/O 开销。只要文件尚未更新,您就可以重新运行各个线程以从故障中恢复。

One strategy is to assign each worker an offset so if you have eight worker processes you assign then numbers 0 to 7. Worker number 0 reads the first record processes it then skips 7 and goes on to process the 8th record etc., worker number 1 reads the second record then skips 7 and processes the 9th record.........

There are a number of advantages to this scheme. It doesnt matter how big the file is the work is always divided evenly, processes on the same machine will process at roughly the same rate, and use the same buffer areas so you dont incur any excessive I/O overhead. As long as the file hasnt been updated you can rerun individual threads to recover from failures.

篱下浅笙歌 2024-08-19 23:43:47

你没有提到你是如何处理这些线条的;可能是最重要的信息。

每条线都是独立的吗?计算是否取决于一行在下一行之前的情况?它们必须分块处理吗?每行处理需要多长时间?是否有一个处理步骤必须在最后合并“所有”数据?或者可以丢弃中间结果并仅保留运行总计吗?可以通过将文件大小除以线程数来最初分割文件吗?或者它会随着你的处理而增长吗?

如果行是独立的并且文件不会增长,那么您唯一需要的协调就是为每个工作人员分配“起始地址”和“长度”;他们可以独立打开和查找文件,然后您只需协调他们的结果;也许通过等待 N 个结果返回到队列中。

如果行不是独立的,则答案将在很大程度上取决于文件的结构。

You dont mention how you are processing the lines; possibly the most important piece of info.

Is each line independant? Is the calculation dependant on one line coming before the next? Must they be processed in blocks? How long does the processing for each line take? Is there a processing step that must incorporate "all" the data at the end? Or can intermediate results be thrown away and just a running total maintained? Can the file be initially split by dividing filesize by count of threads? Or does it grow as you process it?

If the lines are independant and the file doesn't grow, the only coordination you need is to farm out "starting addresses" and "lengths" to each of the workers; they can independantly open and seek into the file and then you must simply coordinate their results; perhaps by waiting for N results to come back into a queue.

If the lines are not independant, the answer will depend highly on the structure of the file.

两仪 2024-08-19 23:43:47

这在很大程度上取决于文件的格式。

将其拆分到任何地方有意义吗?或者您需要将其拆分为新行吗?或者您是否需要确保在对象定义的末尾将其拆分?

您应该在同一个文件上使用多个读取器,而不是拆分文件,并使用 os.lseek 跳转到文件的适当部分。

更新:Poster 补充说他想分成新的线路。然后我提出以下建议:

假设您有 4 个进程。那么简单的解决方案是 os.lseek 到文件的 0%、25%、50% 和 75%,并读取字节,直到遇到第一个新行。这是每个流程的起点。您不需要分割文件来执行此操作,只需在每个进程中寻找大文件中的正确位置并从那里开始读取即可。

It depends a lot on the format of your file.

Does it make sense to split it anywhere? Or do you need to split it at a new line? Or do you need to make sure that you split it at the end of an object definition?

Instead of splitting the file, you should use multiple readers on the same file, using os.lseek to jump to the appropriate part of the file.

Update: Poster added that he wants to split on new lines. Then I propose the following:

Let's say you have 4 processes. Then the simple solution is to os.lseek to 0%, 25%, 50% and 75% of the file, and read bytes until you hit the first new line. That's your starting point for each process. You don't need to split the file to do this, just seek to the right location in the large file in each process and start reading from there.

寂寞陪衬 2024-08-19 23:43:47

Fredrik Lundh 的关于 Tim Bray 宽取景器基准的一些注释是一本有趣的读物,讲述了一个非常类似的用例,有很多好的建议。其他各种作者也实现了相同的功能,其中一些是从文章中链接的,但您可能想尝试在谷歌上搜索“python Wide finder”或其他内容来找到更多内容。 (还有一个基于 multiprocessing 模块的解决方案,但似乎不再可用)

Fredrik Lundh's Some Notes on Tim Bray's Wide Finder Benchmark is an interesting read, about a very similar use case, with a lot of good advice. Various other authors also implemented the same thing, some are linked from the article, but you might want to try googling for "python wide finder" or something to find some more. (there was also a solution somewhere based on the multiprocessing module, but that doesn't seem to be available anymore)

九局 2024-08-19 23:43:47

我知道您特别询问了有关 Python 的问题,但我鼓励您查看 Hadoop (http://hadoop.apache.org /):它实现了专门为解决此类问题而设计的Map和Reduce算法。

祝你好运

I know you specifically asked about Python, but I will encourage you to look at Hadoop (http://hadoop.apache.org/): it implements the Map and Reduce algorithm which was specifically designed to address this kind of problem.

Good luck

抠脚大汉 2024-08-19 23:43:47

如果运行时间很长,则不要让每个进程通过队列读取下一行,而是让进程批量读取行。这样,开销就可以分摊到几条线路上(例如数千条或更多条线路)。

If the run time is long, instead of having each process read its next line through a Queue, have the processes read batches of lines. This way the overhead is amortized over several lines (e.g. thousands or more).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文