pyarrow write_dataset每个分区文件限制

发布于 2025-01-17 15:19:28 字数 1996 浏览 3 评论 0原文

假设我有一些数据位于现有数据集中,该数据集中没有列分区,但仍然有 200 多个文件。我想将该数据重写到配置单元分区中。数据集太大,无法将其全部打开到内存中。在本例中,数据集来自 Azure Synapse 中的 CETA,但我认为这并不重要。

我这样做:

dataset=ds.dataset(datadir, format="parquet", filesystem=abfs)
new_part = ds.partitioning(pa.schema([("nodename", pa.string())]), flavor="hive")
scanner=dataset.scanner()
ds.write_dataset(scanner, newdatarootdir, 
                    format="parquet", partitioning=new_part,
                    existing_data_behavior="overwrite_or_ignore",
                    max_partitions=2467)

在本例中,有 2467 个唯一的节点名,因此我希望有 2467 个目录,每个目录有 1 个文件。然而我得到的是 2467 个目录,每个目录有 100 多个文件,每个文件大约 10KB。如何为每个分区获取 1 个文件?

我可以做第二步

for node in nodes:
    fullpath=f"{datadir}\\{node}"
    curnodeds=ds.dataset(fullpath, format="parquet")
    curtable=curnodeds.to_table()
    os.makedirs(f"{newbase}\\{node}")
    pq.write_table(curtable, f"{newbase}\\{node}\\part-0.parquet",
                version="2.6", flavor="spark", data_page_version="2.0")

有没有办法将第二步合并到第一步中?

编辑:

这是使用 pyarrow 7.0.0

编辑(2):

使用 max_open_files=3000 确实导致每个分区一个文件。两者之间的元数据比较是,两步方法(对于一个分区)...

<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A40>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 1
format_version: 2.6
serialized_size: 1673

size on disk: 278kb

和一步...

<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A90>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 188
format_version: 1.0
serialized_size: 148313

size on disk: 1.04MB

显然,在两步版本中,我明确将版本设置为 2.6,以便解释这一差异。第 2 步后的数据总大小为 655MB,第 1 步后数据总大小为 2.6GB。时间差异也相当大。两步大约是第一步20分钟,第二步40分钟。整个过程只需要 5 分钟。

剩下的问题是,如何在write_dataset中设置version="2.6"data_page_version="2.0"?我仍然想知道为什么 row_groups 如此不同,如果它们在设置这些参数时如此不同,但我会推迟这个问题。

Let's say I've got some data which is in an existing dataset which has no column partitioning but is still 200+ files. I want to rewrite that data into a hive partition. The dataset is too big to open its entirety into memory. In this case the dataset came from a CETAs in Azure Synapse but I don't think that matters.

I do:

dataset=ds.dataset(datadir, format="parquet", filesystem=abfs)
new_part = ds.partitioning(pa.schema([("nodename", pa.string())]), flavor="hive")
scanner=dataset.scanner()
ds.write_dataset(scanner, newdatarootdir, 
                    format="parquet", partitioning=new_part,
                    existing_data_behavior="overwrite_or_ignore",
                    max_partitions=2467)

In this case there are 2467 unique nodenames and so I want there to be 2467 directories each with 1 file. However what I get is 2467 directories each with 100+ files of about 10KB each. How do I get just 1 file per partition?

I could do a second step

for node in nodes:
    fullpath=f"{datadir}\\{node}"
    curnodeds=ds.dataset(fullpath, format="parquet")
    curtable=curnodeds.to_table()
    os.makedirs(f"{newbase}\\{node}")
    pq.write_table(curtable, f"{newbase}\\{node}\\part-0.parquet",
                version="2.6", flavor="spark", data_page_version="2.0")

Is there a way to incorporate the second step into the first step?

Edit:

This is with pyarrow 7.0.0

Edit(2):

Using max_open_files=3000 did result in one file per partition. The metadata comparison between the two are that the two step approach has (for one partition)...

<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A40>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 1
format_version: 2.6
serialized_size: 1673

size on disk: 278kb

and the one step...

<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A90>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 188
format_version: 1.0
serialized_size: 148313

size on disk: 1.04MB

Obviously in the two step version I'm explicitly setting the version to 2.6 so that explains that difference. The total size of my data after the 2-step is 655MB vs the 1 step is 2.6GB. The time difference was pretty significant too. The two step was something like 20 minutes for the first step and 40 minutes for the second. The one step was like 5 minutes for the whole thing.

The remaining question is, how to set version="2.6" and data_page_version="2.0" in write_dataset? I'll still wonder why the row_groups is so different if they're so different when setting those parameters but I'll defer that question.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

二货你真萌 2025-01-24 15:19:28

数据集作者在7.0.0中显着更改。以前,它始终每个分区创建1个文件。现在有几个设置可能导致其编写多个文件。此外,看来您最终要获得许多小排组,这是不理想的,这可能是一步过程既较慢又较大的原因。

第一个重要的设置是max_open_files。一些系统限制了一次可以打开多少个文件描述符。 Linux默认为1024,因此Pyarrow尝试默认为〜900(假设某些文件描述符将打开以进行扫描等)。对于某些数据集,这效果很好。但是,如果每个批次都有每个文件的数据,则根本无法正常工作。在这种情况下,您可能想增加max_open_files比您的分区数大(带有一些摆动室,因为您也将有一些可以打开的文件以读取)。您可能需要调整特定于OS的设置以允许此设置(通常,这些OS限制非常保守,并且提高此限制是无害的)。

我仍然想知道为什么row_groups在设置这些参数时如此不同,但我会推迟这个问题。

此外,7.0.0版本添加min_rows_per_groupmax_rows_per_groupmax_rows_per_per_file参数参数到write> write_dataset呼叫。设置min_rows_per_group到100万之类的东西将导致作者在内存中缓冲行,直到足够写入为止。这将使您可以使用1行组而不是188行组创建文件。这应该降低您的文件大小并解决您的性能问题。

但是,与此相关的内存成本将为min_rows_per_group * num_files * size_of_row_in_bytes

剩下的问题是,如何设置版本=“ 2.6”和data_page_version =“ 2.0” write_dataset?

write_dataset呼叫以几种不同格式(例如CSV,IPC,ORC)起作用,因此调用仅具有适用于格式的通用选项。

可以使用file_options参数设置格式特定的设置:

# It does not appear to be documented but make_write_options
# should accept most of the kwargs that write_table does
file_options = ds.ParquetFileFormat().make_write_options(version='2.6', data_page_version='2.0')
ds.write_dataset(..., file_options=file_options)

The dataset writer was changed significantly in 7.0.0. Previously, it would always create 1 file per partition. Now there are a couple of settings that could cause it to write multiple files. Furthermore, it looks like you are ending up with a lot of small row groups which is not ideal and probably the reason the one-step process is both slower and larger.

The first significant setting is max_open_files. Some systems limit how many file descriptors can be open at one time. Linux defaults to 1024 and so pyarrow attempts defaults to ~900 (with the assumption that some file descriptors will be open for scanning, etc.) When this limit is exceeded pyarrow will close the least recently used file. For some datasets this works well. However, if each batch has data for each file this doesn't work well at all. In that case you probably want to increase max_open_files to be greater than your number of partitions (with some wiggle room because you will have some files open for reading too). You may need to adjust OS-specific settings to allow this (generally, these OS limits are pretty conservative and raising this limit is fairly harmless).

I'll still wonder why the row_groups is so different if they're so different when setting those parameters but I'll defer that question.

In addition, the 7.0.0 release adds min_rows_per_group, max_rows_per_group and max_rows_per_file parameters to the write_dataset call. Setting min_rows_per_group to something like 1 million will cause the writer to buffer rows in memory until it has enough to write. This will allow you to create files with 1 row group instead of 188 row groups. This should bring down your file size and fix your performance issues.

However, there is a memory cost associated with this which is going to be min_rows_per_group * num_files * size_of_row_in_bytes.

The remaining question is, how to set version="2.6" and data_page_version="2.0" in write_dataset?

The write_dataset call works on several different formats (e.g. csv, ipc, orc) and so the call only has generic options that apply regardless of format.

Format-specific settings can instead be set using the file_options parameter:

# It does not appear to be documented but make_write_options
# should accept most of the kwargs that write_table does
file_options = ds.ParquetFileFormat().make_write_options(version='2.6', data_page_version='2.0')
ds.write_dataset(..., file_options=file_options)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文