为什么dask的map_partition函数比在分区上循环更多的内存?

发布于 2025-02-08 22:38:14 字数 1178 浏览 2 评论 0原文

我有一个由车辆ID索引并由时间戳排序的车辆的位置数据文件。我想读取镶木木文件,对每个分区(而不是聚合)进行一些计算,然后将输出直接写入相似大小的新镶木quet文件。

我组织了我的数据,并编写了我的代码(下)使用Dask的MAP_分区,因为我了解到这将一次执行操作一个分区,从而将每个结果保存为顺序磁盘,从而最大程度地减少内存使用情况。我很惊讶地发现这超出了我的可用内存,我发现,如果我创建一个一次在单个分区上运行代码并将输出附加到新parquet文件上的循环(请参阅下面的第二代码块),则它很容易适合内存。

我使用map_partitions的原始方式有不正确的东西吗?如果没有,为什么它会使用更多的内存?实现我想要的正确,最有效的方法是什么?

事先感谢您的任何见解!!

原始(内存饥饿)代码:

ddf = dd.read_parquet(input_file)
meta_dict = ddf.dtypes.to_dict()

(
    ddf
    .map_partitions(my_function, meta = meta_dict)
    .to_parquet(
        output_file,
        append = False,
        overwrite = True,
        engine = 'fastparquet'
    )
)

尴尬的循环(但更多内存友好)代码:

ddf = dd.read_parquet(input_file)

for partition in range(0, ddf.npartitions, 1):
    partition_df = ddf.partitions[partition]
    (
        my_function(partition_df)
        .to_parquet(
            output_file,
            append = True,
            overwrite = False,
            engine = 'fastparquet'
        )
    )

更多硬件和数据详细信息: 总输入镶木文件约为5GB,分为11个分区,最高为900MB。它是由ID索引的,具有分区,因此我可以在无需跨分区工作的情况下进行车辆分组操作。我正在使用的笔记本电脑具有16GB RAM和19GB交换。原始代码使用所有两者,而循环版本则适合RAM。

I have a parquet file of position data for vehicles that is indexed by vehicle ID and sorted by timestamp. I want to read the parquet file, do some calculations on each partition (not aggregations) and then write the output directly to a new parquet file of similar size.

I organized my data and wrote my code (below) to use Dask's map_partitions, as I understood this would perform the operations one partition at a time, saving each result to disk sequentially and thereby minimizing memory usage. I was surprised to find that this was exceeding my available memory and I found that if I instead create a loop that runs my code on a single partition at a time and appends the output to the new parquet file (see second code block below), it easily fits within memory.

Is there something incorrect in the original way I used map_partitions? If not, why does it use so much more memory? What is the proper, most efficient way of achieving what I want?

Thanks in advance for any insight!!

Original (memory hungry) code:

ddf = dd.read_parquet(input_file)
meta_dict = ddf.dtypes.to_dict()

(
    ddf
    .map_partitions(my_function, meta = meta_dict)
    .to_parquet(
        output_file,
        append = False,
        overwrite = True,
        engine = 'fastparquet'
    )
)

Awkward looped (but more memory friendly) code:

ddf = dd.read_parquet(input_file)

for partition in range(0, ddf.npartitions, 1):
    partition_df = ddf.partitions[partition]
    (
        my_function(partition_df)
        .to_parquet(
            output_file,
            append = True,
            overwrite = False,
            engine = 'fastparquet'
        )
    )

More hardware and data details:
The total input parquet file is around 5GB and is split into 11 partitions of up to 900MB. It is indexed by ID with divisions so I can do vehicle grouped operations without working across partitions. The laptop I'm using has 16GB RAM and 19GB swap. The original code uses all of both, while the looped version fits within RAM.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

万水千山粽是情ミ 2025-02-15 22:38:14

正如@michaeldelgado指出的那样,默认情况下,Dask将根据机器上的可用内容来旋转多个工人/线程。使用我拥有的分区大小,使用MAP_PARTITION方法时,这将最大程度地显示出可用的内存。为了避免这种情况,我限制了工人使用以下代码来防止自动分层的每个工人的线程数,而任务适合内存。

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
    n_workers = 1,
    threads_per_worker = 1)
client = Client(cluster)

As @MichaelDelgado pointed out, by default Dask will spin up multiple workers/threads according to what is available on the machine. With the size of the partitions I have, this maxes out the available memory when using the map_partitions approach. In order to avoid this, I limited the number of workers and the number of threads per worker to prevent automatic parellelization using the code below, and the task fit in memory.

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
    n_workers = 1,
    threads_per_worker = 1)
client = Client(cluster)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文