pyarrow write_dataset每个分区文件限制
假设我有一些数据位于现有数据集中,该数据集中没有列分区,但仍然有 200 多个文件。我想将该数据重写到配置单元分区中。数据集太大,无法将其全部打开到内存中。在本例中,数据集来自 Azure Synapse 中的 CETA,但我认为这并不重要。
我这样做:
dataset=ds.dataset(datadir, format="parquet", filesystem=abfs)
new_part = ds.partitioning(pa.schema([("nodename", pa.string())]), flavor="hive")
scanner=dataset.scanner()
ds.write_dataset(scanner, newdatarootdir,
format="parquet", partitioning=new_part,
existing_data_behavior="overwrite_or_ignore",
max_partitions=2467)
在本例中,有 2467 个唯一的节点名,因此我希望有 2467 个目录,每个目录有 1 个文件。然而我得到的是 2467 个目录,每个目录有 100 多个文件,每个文件大约 10KB。如何为每个分区获取 1 个文件?
我可以做第二步
for node in nodes:
fullpath=f"{datadir}\\{node}"
curnodeds=ds.dataset(fullpath, format="parquet")
curtable=curnodeds.to_table()
os.makedirs(f"{newbase}\\{node}")
pq.write_table(curtable, f"{newbase}\\{node}\\part-0.parquet",
version="2.6", flavor="spark", data_page_version="2.0")
有没有办法将第二步合并到第一步中?
编辑:
这是使用 pyarrow 7.0.0
编辑(2):
使用 max_open_files=3000
确实导致每个分区一个文件。两者之间的元数据比较是,两步方法(对于一个分区)...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A40>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 1
format_version: 2.6
serialized_size: 1673
size on disk: 278kb
和一步...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A90>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 188
format_version: 1.0
serialized_size: 148313
size on disk: 1.04MB
显然,在两步版本中,我明确将版本设置为 2.6,以便解释这一差异。第 2 步后的数据总大小为 655MB,第 1 步后数据总大小为 2.6GB。时间差异也相当大。两步大约是第一步20分钟,第二步40分钟。整个过程只需要 5 分钟。
剩下的问题是,如何在write_dataset
中设置version="2.6"
和data_page_version="2.0"
?我仍然想知道为什么 row_groups
如此不同,如果它们在设置这些参数时如此不同,但我会推迟这个问题。
Let's say I've got some data which is in an existing dataset which has no column partitioning but is still 200+ files. I want to rewrite that data into a hive partition. The dataset is too big to open its entirety into memory. In this case the dataset came from a CETAs in Azure Synapse but I don't think that matters.
I do:
dataset=ds.dataset(datadir, format="parquet", filesystem=abfs)
new_part = ds.partitioning(pa.schema([("nodename", pa.string())]), flavor="hive")
scanner=dataset.scanner()
ds.write_dataset(scanner, newdatarootdir,
format="parquet", partitioning=new_part,
existing_data_behavior="overwrite_or_ignore",
max_partitions=2467)
In this case there are 2467 unique nodenames and so I want there to be 2467 directories each with 1 file. However what I get is 2467 directories each with 100+ files of about 10KB each. How do I get just 1 file per partition?
I could do a second step
for node in nodes:
fullpath=f"{datadir}\\{node}"
curnodeds=ds.dataset(fullpath, format="parquet")
curtable=curnodeds.to_table()
os.makedirs(f"{newbase}\\{node}")
pq.write_table(curtable, f"{newbase}\\{node}\\part-0.parquet",
version="2.6", flavor="spark", data_page_version="2.0")
Is there a way to incorporate the second step into the first step?
Edit:
This is with pyarrow 7.0.0
Edit(2):
Using max_open_files=3000
did result in one file per partition. The metadata comparison between the two are that the two step approach has (for one partition)...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A40>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 1
format_version: 2.6
serialized_size: 1673
size on disk: 278kb
and the one step...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A90>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 188
format_version: 1.0
serialized_size: 148313
size on disk: 1.04MB
Obviously in the two step version I'm explicitly setting the version to 2.6 so that explains that difference. The total size of my data after the 2-step is 655MB vs the 1 step is 2.6GB. The time difference was pretty significant too. The two step was something like 20 minutes for the first step and 40 minutes for the second. The one step was like 5 minutes for the whole thing.
The remaining question is, how to set version="2.6"
and data_page_version="2.0"
in write_dataset
? I'll still wonder why the row_groups
is so different if they're so different when setting those parameters but I'll defer that question.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
数据集作者在7.0.0中显着更改。以前,它始终每个分区创建1个文件。现在有几个设置可能导致其编写多个文件。此外,看来您最终要获得许多小排组,这是不理想的,这可能是一步过程既较慢又较大的原因。
第一个重要的设置是
max_open_files
。一些系统限制了一次可以打开多少个文件描述符。 Linux默认为1024,因此Pyarrow尝试默认为〜900(假设某些文件描述符将打开以进行扫描等)。对于某些数据集,这效果很好。但是,如果每个批次都有每个文件的数据,则根本无法正常工作。在这种情况下,您可能想增加max_open_files
比您的分区数大(带有一些摆动室,因为您也将有一些可以打开的文件以读取)。您可能需要调整特定于OS的设置以允许此设置(通常,这些OS限制非常保守,并且提高此限制是无害的)。此外,7.0.0版本添加
min_rows_per_group
,max_rows_per_group
和max_rows_per_per_file
参数参数到write> write_dataset
呼叫。设置min_rows_per_group
到100万之类的东西将导致作者在内存中缓冲行,直到足够写入为止。这将使您可以使用1行组而不是188行组创建文件。这应该降低您的文件大小并解决您的性能问题。但是,与此相关的内存成本将为
min_rows_per_group * num_files * size_of_row_in_bytes
。write_dataset
呼叫以几种不同格式(例如CSV,IPC,ORC)起作用,因此调用仅具有适用于格式的通用选项。可以使用
file_options
参数设置格式特定的设置:The dataset writer was changed significantly in 7.0.0. Previously, it would always create 1 file per partition. Now there are a couple of settings that could cause it to write multiple files. Furthermore, it looks like you are ending up with a lot of small row groups which is not ideal and probably the reason the one-step process is both slower and larger.
The first significant setting is
max_open_files
. Some systems limit how many file descriptors can be open at one time. Linux defaults to 1024 and so pyarrow attempts defaults to ~900 (with the assumption that some file descriptors will be open for scanning, etc.) When this limit is exceeded pyarrow will close the least recently used file. For some datasets this works well. However, if each batch has data for each file this doesn't work well at all. In that case you probably want to increasemax_open_files
to be greater than your number of partitions (with some wiggle room because you will have some files open for reading too). You may need to adjust OS-specific settings to allow this (generally, these OS limits are pretty conservative and raising this limit is fairly harmless).In addition, the 7.0.0 release adds
min_rows_per_group
,max_rows_per_group
andmax_rows_per_file
parameters to thewrite_dataset
call. Settingmin_rows_per_group
to something like 1 million will cause the writer to buffer rows in memory until it has enough to write. This will allow you to create files with 1 row group instead of 188 row groups. This should bring down your file size and fix your performance issues.However, there is a memory cost associated with this which is going to be
min_rows_per_group * num_files * size_of_row_in_bytes
.The
write_dataset
call works on several different formats (e.g. csv, ipc, orc) and so the call only has generic options that apply regardless of format.Format-specific settings can instead be set using the
file_options
parameter: