连接两个非常大的 dask 数据帧

发布于 2025-01-17 02:49:13 字数 996 浏览 2 评论 0原文

我有许多大型数据集(大到比 RAM 还大),我想对其执行过滤、连接和连接。 Pandas 失败了,因为每个数据集大小(大约 25GB)都大于我的 RAM(16GB)。 我现在尝试使用 Pandas 而不是 Dask。现在,每个大型数据集都作为一系列 parquet 文件以块形式存储。

共有三种场景:

  1. 对大数据集进行filter、join等操作,并将结果存储到磁盘。
  2. 连接两个或多个大型 dask 数据帧并将结果存储回磁盘
  3. 同时执行 1 和 2。

需要注意的是,在大多数情况下,上述 3 种情况的结果仍然大于 RAM,因此 .compute() 并不可取。

我尝试过的:

延迟执行所有计算,执行“dask.dataframe.to_parquet()”并将结果写入磁盘,而不是使用 .compute()。这适用于过滤等少数情况,但不适用于排序、联合、drop_duplicates 等操作。

我试图解决的问题:

一种获取结果并将其存储到磁盘而不会导致内存错误的万无一失的方法。

示例

dask_df1 = dd.read_parquet('path_to_data1/data1.*.parquet')
dask_df2 = dd.read_parquet('path_to_data2/data2.*.parquet')
union_df = dd.concat([dask_df1,dask_df2])
filtered = union_df[ *some filtering condition* ]
result = filtered.merge( *with some other large dask dataframe* )

结果也应该大于内存,这是微不足道的。使用 result.compute() 会引发 memory_error

I have many large datasets (large as in larger than the RAM) on which I want to perform Filters, Joins and Concats. Pandas failed since each dataset size (around 25GB) is larger than my RAM (16GB).
I am now trying to use Pandas instead of Dask. Each large dataset is now stored in chunks as a series of parquet files.

There are three scenarios

  1. Perform operations such as filter and join on the large dataset and store the result to disk.
  2. Concatenate two or more large dask dataframes and store the result back to disk
  3. Perform 1 and 2 together.

It is important to note that, in most cases the result from any of the above 3 scenarios is still larger than the RAM, hence .compute() is not desirable.

What I've tried:

Lazily perform all calculations, perform "dask.dataframe.to_parquet()" and write the results to disk, instead of using .compute(). This works for a few cases such as filtering, but fails for operations such as sort, union, drop_duplicates.

What I am trying to solve:

A failproof way to obtain the result and store it to disk without causing memory errors.

Example

dask_df1 = dd.read_parquet('path_to_data1/data1.*.parquet')
dask_df2 = dd.read_parquet('path_to_data2/data2.*.parquet')
union_df = dd.concat([dask_df1,dask_df2])
filtered = union_df[ *some filtering condition* ]
result = filtered.merge( *with some other large dask dataframe* )

It is trivial that result too shall be larger-than-memory. Using result.compute() would raise memory_error.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文