使用 Pool 在 Python 中多重处理 CSV 块

发布于 2025-01-11 16:17:38 字数 1334 浏览 0 评论 0原文

我正在尝试在虚拟机上处理非常大的 CSV 文件(大约 6 GB 的 .gz 文件),为了加快速度,我正在研究各种多处理工具。我对此还很陌生,所以我正在不断学习,但到目前为止,我从一天的研究中得到的结果是,池非常适合依赖 CPU 的任务。

我正在处理一个非常大的 CSV,方法是将其分成设定大小的块,然后单独处理这些块。目标是能够并行处理这些块,但不需要首先创建包含所有块的数据帧列表,因为这本身会花费很长时间。块处理几乎完全基于 pandas(不确定这是否相关),所以我不能使用 dask。然后,处理函数之一将我的结果写入输出文件。理想情况下,我想保留结果的顺序,但如果我做不到,我可以稍后尝试解决它。这是我到目前为止得到的结果:

if __name__ == "__main__":
   parser = parse()
   args = parser.parse_args()
   
   a = Analysis( vars( args ) )
   
   attributes = vars( a )
   
   count = 0
   pool = mp.Pool( processes = mp.cpu_count() )
   
   for achunk in pd.read_csv( a.File,
                              compression      = 'gzip',
                              names            =  inputHeader,
                              chunksize        =  simsize,
                              skipinitialspace =  True,
                              header           =  None
                              ):
       pool.apply_async( a.beginProcessChunk( achunk,
                                              start_time,
                                              count
                                              )
                         )
       count += 1

这最终需要与串行运行相同的时间(在小文件上测试),并且实际上需要更长的时间。我不确定我到底做错了什么,但我假设将池函数放入循环中不会使循环过程并行。我对此真的很陌生,所以也许我只是错过了一些微不足道的东西,所以我提前对此感到抱歉。谁能给我一些建议和/或告诉我到底如何才能完成这项工作?

I'm trying to process a very large CSV file (.gz files of around 6 GB) on a VM, and to speed things up I'm looking into various multiprocessing tools. I'm pretty new to this so I'm learning as I go but so far what I got from a day of research is that pool works great for CPU reliant tasks.

I'm processing a very large CSV by dividing it into chunks of set size, and processing those chunks individually. The goal is to be able to process these chunks in parallel, but without needing to first create a list of dataframes with all the chunks in it, as that would take a really long time in itself. Chunk processing is almost entirely pandas based (not sure if that's relevant) so I can't use dask. One of the processing functions then writes my results to an outfile. Ideally I would like to preserve the order of the results, but if I can't do that I can try to work around it later. Here's what I got so far:

if __name__ == "__main__":
   parser = parse()
   args = parser.parse_args()
   
   a = Analysis( vars( args ) )
   
   attributes = vars( a )
   
   count = 0
   pool = mp.Pool( processes = mp.cpu_count() )
   
   for achunk in pd.read_csv( a.File,
                              compression      = 'gzip',
                              names            =  inputHeader,
                              chunksize        =  simsize,
                              skipinitialspace =  True,
                              header           =  None
                              ):
       pool.apply_async( a.beginProcessChunk( achunk,
                                              start_time,
                                              count
                                              )
                         )
       count += 1

This ultimately takes the same amount of time as running it serially (tested on a small file), and it actually takes a tiny bit longer. I'm not sure exactly what I'm doing wrong but I'm assuming that putting the pool function inside a loop won't make the loop process in parallel. I'm really new to this so maybe I'm just missing something trivial, so I'm sorry in advance for that. Could anyone give me some advice on this and/or tell me how exactly I can make this work?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文