为许多文件使用多处理的最佳方法
因此,我有大量的文件列表需要将其处理到CSV中。每个文件本身都很大,每行都是字符串。文件的每一行都可以代表三种类型的数据之一,每个数据的处理方式有所不同。我当前的解决方案看起来如下:
type1_columns = [...]
type2_columns = [...]
type3_columns = [...]
file_list = os.listdir(filelist)
def process_type1_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type1_series = pd.Series(to_append, index=type1_columns)
return type1_series
def process_type2_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type2_series = pd.Series(to_append, index=type2_columns)
return type2_series
def process_type3_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type3_series = pd.Series(to_append, index=type3_columns)
return type3_series
def process_file(file):
type1_df = pd.DataFrame(columns=type1_columns)
type2_df = pd.DataFrame(columns=type2_columns)
type3_df = pd.DataFrame(columns=type3_columns)
with open(filepath/file) as f:
data=f.readlines()
for line in data:
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
type1_df = type1_df.append(type1_series, ignore_index=True)
if record_type == "type2":
type2_series = process_type2_line(json_line)
type2_df = type2_df.append(type2_series, ignore_index=True)
if record_type == "type3":
type3_series = process_type3_line(json_line)
type3_df = type3_df.append(type3_series, ignore_index=True)
type1_df.to_csv(type1_csv_path.csv)
type2_df.to_csv(type2_csv_path.csv)
type3_df.to_csv(type3_csv_path.csv)
for file in file_list:
process_file(file)
我通过文件迭代,并为三种不同类型的记录中的每一个创建数据范围。我解析了线条,并为每个行调用适当的处理功能。返回的系列将附加到该文件的该record_type的最终数据框架上。处理文件后,将三个数据范围保存为CSV,我们从下一个文件开始。
问题在于,这种方法需要太长时间了,我需要数周才能处理所有文件。
我试图通过使用多处理(我没有很多经验)来修改我的方法:
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(process_file, file) for file in file_list]
在某些记录打印语句中,我可以看到这开始了30个文件的处理,但没有完成,所以我至少至少我已经完成了知道我的方法有缺陷。谁能解释该问题的最佳方法是什么?也许多处理和异步的某种组合?
So I have a large list of files that need to be processed into CSVs. Each file itself is quite large, and each line is a string. Each line of the files could represent one of three types of data, each of which is processed a bit differently. My current solution looks like the following:
type1_columns = [...]
type2_columns = [...]
type3_columns = [...]
file_list = os.listdir(filelist)
def process_type1_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type1_series = pd.Series(to_append, index=type1_columns)
return type1_series
def process_type2_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type2_series = pd.Series(to_append, index=type2_columns)
return type2_series
def process_type3_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type3_series = pd.Series(to_append, index=type3_columns)
return type3_series
def process_file(file):
type1_df = pd.DataFrame(columns=type1_columns)
type2_df = pd.DataFrame(columns=type2_columns)
type3_df = pd.DataFrame(columns=type3_columns)
with open(filepath/file) as f:
data=f.readlines()
for line in data:
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
type1_df = type1_df.append(type1_series, ignore_index=True)
if record_type == "type2":
type2_series = process_type2_line(json_line)
type2_df = type2_df.append(type2_series, ignore_index=True)
if record_type == "type3":
type3_series = process_type3_line(json_line)
type3_df = type3_df.append(type3_series, ignore_index=True)
type1_df.to_csv(type1_csv_path.csv)
type2_df.to_csv(type2_csv_path.csv)
type3_df.to_csv(type3_csv_path.csv)
for file in file_list:
process_file(file)
I iterate through the files, and create dataframes for each of the three different types of records. I parse through the lines and call the appropriate processing function for each. The returned series is appended to the final dataframe for that record_type for that file. Once the file is processed, the three dataframes are saved as CSVs and we begin with the next file.
The issues is that this approach takes far too long, it would take weeks for me to process all of the files.
I tried to modify my approach by using multiprocessing (which I don't have a ton of experience with) with the following:
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(process_file, file) for file in file_list]
In some logging print statements I can see that this started the processing for 30 files but none have completed so I at least know that my approach is flawed. Could anyone explain what the best approach to this problem would be? Perhaps some combination of multiprocessing and asyncio?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您有两个大问题:
您将整个输入文件加载到内存中,在内存中产生整个结果,然后一次编写整个输出文件。这意味着,如果您有30名并行操作的工人,则需要与30个(自描述)大型文件相称的内存。您也将所有数据存储两次,一次为
list
str
f.readlines()返回的行()
,然后再次在其中一个三个dataframe
s;如果您使用的代码,则没有执行者,并且只是更改:to:
您会立即将记忆使用量减少大约一半,这可能足以阻止页面折磨。也就是说,您仍将使用与文件大小相称的内存来存储
dataframe
s,因此,如果您并行代码并行,您仍会恢复thrash,并且即使文件是文件为足够大。您正在使用
.append
for 每个行,iirc,对于dataframe
s是schlemiel painter的算法的一种形式:每个附加
制作一个全新的dataframe
,复制旧dataframe
的全部内容以及将新数据少的新数据纳入新dataFrame
,随着现有数据变大,工作越来越长;应该摊销的o(n)
工作变为o(n ** 2)
工作。在其中两个之间,您使用的是 Way 的内存,比所需的内存更多,和在重复的附录上执行不必要的繁忙工作的 ton 。并行性可能有助于更快地完成忙碌的工作,但是作为交换,它将您的内存需求增加了30倍。奇怪的是,您没有太多的RAM(如果这些文件确实很大,您可能甚至没有足够的RAM来用于其中一个文件),最终您会thrashing(将内存写入PageFile/swap文件,为其他东西腾出空间,按需阅读它,并经常丢弃在您处理之前的记忆中,这使记忆访问与磁盘性能绑定在一起,这比RAM访问速度慢了几个数量级)。
我不太了解大熊猫是否为您的工作提供了更好的,增量的解决方案,但您实际上并不需要一个。只需按行与输入线一起工作,然后使用
csv
模块在您走时编写行。您的内存需求将从“与输入文件的每个行的数据相称”,从“比例到每个输入文件的大小”。您的
process_file
函数最终会看起来像以下内容:如果此操作(无执行者),请使用它。如果您即使没有执行者即使在页面上都在打击,或者文件足够大,以至于重复
附加
s严重伤害了您,这可能足以使其本身能够全部工作。如果它太慢,则执行者可能会如果您正在做 lot 的工作来将每行处理为输出格式(因为大多数工人正在处理,但一两个工人可以充分地共享磁盘访问读物和写入),但是如果每行处理工作较低,那么少数工人的任何东西(我只有两个或三个)只会增加磁盘的争论(尤其是如果您使用的是旋转磁盘硬盘驱动器,而不是SSD),并且并行性要么无济于事,要么会积极伤害。您可能需要调整所使用的精确的CSV方言(作为参数传递给
csv.writer
),并可能明确设置特定的编码
,而不是输出文件默认情况(例如传递encoding ='utf-8'
或encoding ='utf-16'
open
s始终写作在编码.csv
文件期望的消费者中,但这是一般的形式。You've got two big problems:
You're loading the whole input file into memory, producing the entire result in memory, then writing the whole output file at once. This means that, if you have 30 workers operating in parallel, you need memory proportionate to 30 of your (self-described) large files. You store all the data twice as well, once as the
list
ofstr
lines returned byf.readlines()
, then again in one of the threeDataFrame
s; if you used your code, without executors, as is, and just changed:to:
you'd immediately reduce your memory usage by roughly half, which (might) be enough to stop page thrashing. That said, you'd still be using memory proportionate to file size to store the
DataFrame
s, so you'd resume thrashing if you parallelized your code, and might still thrash even without parallelism if the files are large enough.You're using
.append
for every line, which, IIRC, forDataFrame
s is a form of Schlemiel the Painter's algorithm: Eachappend
makes a brand-newDataFrame
, copying the entire contents of the oldDataFrame
plus the tiny amount of new data into a newDataFrame
, with the work taking longer and longer as the existing data gets bigger; what should be amortizedO(n)
work becomesO(n**2)
work.Between the two of those, you're using way more memory than needed, and performing a ton of unnecessary busywork on the repeated appends. The parallelism might help do the busywork faster, but in exchange, it increases your memory requirements by a factor of 30x; odds are, you don't have that much RAM (if these files are truly large, you may not have enough RAM for even one of the files), and you end up page thrashing (writing memory out to the pagefile/swap file to make room for other stuff, reading it back in on demand, and frequently dropping memory that's paged in before you're through with it, making memory access tied to disk performance, which is a few orders of magnitude slower than RAM access).
I don't know Pandas well enough to say if it offers some better, incremental solution for what you're doing, but you don't really need one; just work with the input line by line, and use the
csv
module to write rows as you go. Your memory requirements will drop from "proportionate to the size of each input file" to "proportionate to the data from each line of the input file".Your
process_file
function would end up looking something like:If this works as is (without executors), use it that way. If you were page thrashing even without executors, or the files were large enough that the repeated
append
s hurt you badly, this might be enough to make it work all by itself. If it's too slow, executors might provide a little benefit if you're doing a lot of work to process each line into the output format (because while most workers are processing, one or two workers can adequately share disk access for reads and writes), but if the processing work per line is low, anything over a small handful of workers (I'd start with just two or three) would just increase disk contention (especially if you're using a spinning disk hard drive, not an SSD), and parallelism either won't help, or will actively harm.You may need to tweak the exact CSV dialect used (passed as arguments to
csv.writer
), and possibly explicitly set a specificencoding
for the output files rather than the locale default (e.g. passingencoding='utf-8'
orencoding='utf-16'
to theopen
s for write, so it's always writing in the encoding the consumer(s) of the.csv
files expects), but that's the general form to go for.