连接两个非常大的 dask 数据帧
我有许多大型数据集(大到比 RAM 还大),我想对其执行过滤、连接和连接。 Pandas 失败了,因为每个数据集大小(大约 25GB)都大于我的 RAM(16GB)。 我现在尝试使用 Pandas 而不是 Dask。现在,每个大型数据集都作为一系列 parquet 文件以块形式存储。
共有三种场景:
- 对大数据集进行filter、join等操作,并将结果存储到磁盘。
- 连接两个或多个大型 dask 数据帧并将结果存储回磁盘
- 同时执行 1 和 2。
需要注意的是,在大多数情况下,上述 3 种情况的结果仍然大于 RAM,因此 .compute() 并不可取。
我尝试过的:
延迟执行所有计算,执行“dask.dataframe.to_parquet()”并将结果写入磁盘,而不是使用 .compute()。这适用于过滤等少数情况,但不适用于排序、联合、drop_duplicates 等操作。
我试图解决的问题:
一种获取结果并将其存储到磁盘而不会导致内存错误的万无一失的方法。
示例
dask_df1 = dd.read_parquet('path_to_data1/data1.*.parquet')
dask_df2 = dd.read_parquet('path_to_data2/data2.*.parquet')
union_df = dd.concat([dask_df1,dask_df2])
filtered = union_df[ *some filtering condition* ]
result = filtered.merge( *with some other large dask dataframe* )
结果
也应该大于内存,这是微不足道的。使用 result.compute()
会引发 memory_error
。
I have many large datasets (large as in larger than the RAM) on which I want to perform Filters, Joins and Concats. Pandas failed since each dataset size (around 25GB) is larger than my RAM (16GB).
I am now trying to use Pandas instead of Dask. Each large dataset is now stored in chunks as a series of parquet files.
There are three scenarios
- Perform operations such as filter and join on the large dataset and store the result to disk.
- Concatenate two or more large dask dataframes and store the result back to disk
- Perform 1 and 2 together.
It is important to note that, in most cases the result from any of the above 3 scenarios is still larger than the RAM, hence .compute() is not desirable.
What I've tried:
Lazily perform all calculations, perform "dask.dataframe.to_parquet()" and write the results to disk, instead of using .compute(). This works for a few cases such as filtering, but fails for operations such as sort, union, drop_duplicates.
What I am trying to solve:
A failproof way to obtain the result and store it to disk without causing memory errors.
Example
dask_df1 = dd.read_parquet('path_to_data1/data1.*.parquet')
dask_df2 = dd.read_parquet('path_to_data2/data2.*.parquet')
union_df = dd.concat([dask_df1,dask_df2])
filtered = union_df[ *some filtering condition* ]
result = filtered.merge( *with some other large dask dataframe* )
It is trivial that result
too shall be larger-than-memory. Using result.compute()
would raise memory_error
.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论