如何使用上游数据集上的哈希分区在groupby.applyinpandas上进行排序?

发布于 2025-02-12 04:45:46 字数 6242 浏览 3 评论 0原文

在我的主要变换中,我正在通过执行groupby,然后在Foundry中使用 ApplionInpandas 来运行算法。该构建需要很长时间,一个想法是组织文件以防止使用哈希分区/存储措施进行整理。

对于MCVE,我有以下数据集:

def example_df():
    return spark.createDataFrame(
    [("1","2", 1.0), ("1","3", 2.0), ("2","4", 3.0), ("2","5", 5.0), ("2","2", 10.0)],
    ("id_1","id_2", "v")) 

”在此处输入图像描述

我要应用的转换是:

def df1(example_df):
    def subtract_mean(pdf):
        v = pdf.v
        return pdf.assign(v=v - v.mean())

    return example_df.groupby("id_1","id_2").applyInPandas(subtract_mean, schema="id_1 string, id_2 string, v double")

当我查看原始查询时在没有分区的情况下进行计划,它看起来如下:

”在此处输入图像描述

物理计划:

Execute FoundrySaveDatasetCommand `ri.foundry.main.transaction.00000059-eb1b-61f4-bdb8-a030ac6baf0a@master`.`ri.foundry.main.dataset.eb664037-fcae-4ce2-b92b-bd103cd504b3`, ErrorIfExists, [id_1, id_2, v], ComputedStatsServiceV2Blocking{_endpointChannelFactory=DialogueChannel@3127a629{channelName=dialogue-nonreloading-ComputedStatsServiceV2Blocking, delegate=com.palantir.dialogue.core.DialogueChannel$Builder$$Lambda$713/0x0000000800807c40@70f51090}, runtime=com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime@6c67a62a}, com.palantir.foundry.spark.catalog.caching.CachingSchemaService@7d881feb, com.palantir.foundry.spark.catalog.caching.CachingMetadataService@57a1ef9e, com.palantir.foundry.spark.catalog.FoundrySparkResolver@4d38f6f5, com.palantir.foundry.spark.auth.DefaultFoundrySparkAuthSupplier@21103ab4
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      *(3) BasicStats `ri.foundry.main.transaction.00000059-eb1b-61f4-bdb8-a030ac6baf0a@master`.`ri.foundry.main.dataset.eb664037-fcae-4ce2-b92b-bd103cd504b3`
      +- FlatMapGroupsInPandas [id_1#487, id_2#488], subtract_mean(id_1#487, id_2#488, v#489), [id_1#497, id_2#498, v#499]
         +- *(2) Sort [id_1#487 ASC NULLS FIRST, id_2#488 ASC NULLS FIRST], false, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 0
                  +- Exchange hashpartitioning(id_1#487, id_2#488, 200), ENSURE_REQUIREMENTS, [id=#324]
                     +- *(1) Project [id_1#487, id_2#488, id_1#487, id_2#488, v#489]
                        +- *(1) ColumnarToRow
                           +- FileScan parquet !ri.foundry.main.transaction.00000059-eb12-f234-b25f-57e967fbc68e:ri.foundry.main.transaction.00000059-eb12-f234-b25f-57e967fbc68e@00000003-99f9-3d2d-814f-e4db9c920cc2:master.ri.foundry.main.dataset.237cddc5-0835-425c-bfbe-e62c51779dc2[id_1#487,id_2#488,v#489] Batched: true, BucketedScan: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[sparkfoundry:///datasets/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id_1:string,id_2:string,v:double>, ScanMode: RegularMode

我的目标是防止需要sortshuffle(读取和查询)和Exchange从查询计划中。

为了实现这一目标,我将在一个中间数据集进行分区,以ID列来划分我要稍后将要组合的ID列:

def example_df_bucketed(example_df):
    example_df = example_df.repartition(2,"id_1","id_2")
    output = Transforms.get_output()
    output_fs = output.filesystem()
    output.write_dataframe(example_df,bucket_cols=["id_1","id_2"], sort_by=["id_1","id_2"], bucket_count=2)

我尝试运行相同的逻辑,这次是带有桶的数据集,因为输入

def df2(example_df_bucketed):
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=v - v.mean())

    return example_df_bucketed.groupby("id_1","id_2").applyInPandas(subtract_mean, schema="id_1 string, id_2 string, v double")

会导致查询计划中没有一个洗牌(哈希分区),但仍在整理。

物理计划:

Execute FoundrySaveDatasetCommand `ri.foundry.main.transaction.00000059-ec4c-26f7-a058-98be3f26018c@master`.`ri.foundry.main.dataset.02990b20-95f2-4605-9e7c-578ba071535d`, ErrorIfExists, [id_1, id_2, v], ComputedStatsServiceV2Blocking{_endpointChannelFactory=DialogueChannel@3127a629{channelName=dialogue-nonreloading-ComputedStatsServiceV2Blocking, delegate=com.palantir.dialogue.core.DialogueChannel$Builder$$Lambda$713/0x0000000800807c40@70f51090}, runtime=com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime@6c67a62a}, com.palantir.foundry.spark.catalog.caching.CachingSchemaService@3db2ee77, com.palantir.foundry.spark.catalog.caching.CachingMetadataService@8086bb, com.palantir.foundry.spark.catalog.FoundrySparkResolver@7ebc329, com.palantir.foundry.spark.auth.DefaultFoundrySparkAuthSupplier@46d15de1
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      *(2) BasicStats `ri.foundry.main.transaction.00000059-ec4c-26f7-a058-98be3f26018c@master`.`ri.foundry.main.dataset.02990b20-95f2-4605-9e7c-578ba071535d`
      +- FlatMapGroupsInPandas [id_1#603, id_2#604], subtract_mean(id_1#603, id_2#604, v#605), [id_1#613, id_2#614, v#615]
         +- *(1) Sort [id_1#603 ASC NULLS FIRST, id_2#604 ASC NULLS FIRST], false, 0
            +- *(1) Project [id_1#603, id_2#604, id_1#603, id_2#604, v#605]
               +- *(1) ColumnarToRow
                  +- FileScan parquet !ri.foundry.main.transaction.00000059-ec22-2287-a0e3-5d9c48a39a83:ri.foundry.main.transaction.00000059-ec22-2287-a0e3-5d9c48a39a83@00000003-99fc-8b63-8d17-b7e45fface86:master.ri.foundry.main.dataset.bbada128-5538-4c7f-b5ba-6d16b15da5bf[id_1#603,id_2#604,v#605] Batched: true, BucketedScan: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[sparkfoundry://foundry/datasets/..., PartitionFilters: [], Partitioning: hashpartitioning(id_1#603, id_2#604, 2), PushedFilters: [], ReadSchema: struct<id_1:string,id_2:string,v:double>, ScanMode: RegularMode, SelectedBucketsCount: 2 out of 2

由于我已经设置sort_by在上游时,为什么仍然存在查询计划中的一种?我可以做些什么来避免这种情况?

In my main transform, I'm running an algorithm by doing a groupby and then applyInPandas in Foundry. The build takes very long, and one idea is to organize the files to prevent shuffle reads and sorting, using Hash partitioning/bucketing.

For a mcve, I have the following dataset:

def example_df():
    return spark.createDataFrame(
    [("1","2", 1.0), ("1","3", 2.0), ("2","4", 3.0), ("2","5", 5.0), ("2","2", 10.0)],
    ("id_1","id_2", "v")) 

enter image description here

The transform I want to apply is:

def df1(example_df):
    def subtract_mean(pdf):
        v = pdf.v
        return pdf.assign(v=v - v.mean())

    return example_df.groupby("id_1","id_2").applyInPandas(subtract_mean, schema="id_1 string, id_2 string, v double")

When I look at the original query plan, with no partitioning, it looks like the following:

enter image description here

Physical Plan:

Execute FoundrySaveDatasetCommand `ri.foundry.main.transaction.00000059-eb1b-61f4-bdb8-a030ac6baf0a@master`.`ri.foundry.main.dataset.eb664037-fcae-4ce2-b92b-bd103cd504b3`, ErrorIfExists, [id_1, id_2, v], ComputedStatsServiceV2Blocking{_endpointChannelFactory=DialogueChannel@3127a629{channelName=dialogue-nonreloading-ComputedStatsServiceV2Blocking, delegate=com.palantir.dialogue.core.DialogueChannel$Builder$Lambda$713/0x0000000800807c40@70f51090}, runtime=com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime@6c67a62a}, com.palantir.foundry.spark.catalog.caching.CachingSchemaService@7d881feb, com.palantir.foundry.spark.catalog.caching.CachingMetadataService@57a1ef9e, com.palantir.foundry.spark.catalog.FoundrySparkResolver@4d38f6f5, com.palantir.foundry.spark.auth.DefaultFoundrySparkAuthSupplier@21103ab4
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      *(3) BasicStats `ri.foundry.main.transaction.00000059-eb1b-61f4-bdb8-a030ac6baf0a@master`.`ri.foundry.main.dataset.eb664037-fcae-4ce2-b92b-bd103cd504b3`
      +- FlatMapGroupsInPandas [id_1#487, id_2#488], subtract_mean(id_1#487, id_2#488, v#489), [id_1#497, id_2#498, v#499]
         +- *(2) Sort [id_1#487 ASC NULLS FIRST, id_2#488 ASC NULLS FIRST], false, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 0
                  +- Exchange hashpartitioning(id_1#487, id_2#488, 200), ENSURE_REQUIREMENTS, [id=#324]
                     +- *(1) Project [id_1#487, id_2#488, id_1#487, id_2#488, v#489]
                        +- *(1) ColumnarToRow
                           +- FileScan parquet !ri.foundry.main.transaction.00000059-eb12-f234-b25f-57e967fbc68e:ri.foundry.main.transaction.00000059-eb12-f234-b25f-57e967fbc68e@00000003-99f9-3d2d-814f-e4db9c920cc2:master.ri.foundry.main.dataset.237cddc5-0835-425c-bfbe-e62c51779dc2[id_1#487,id_2#488,v#489] Batched: true, BucketedScan: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[sparkfoundry:///datasets/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id_1:string,id_2:string,v:double>, ScanMode: RegularMode

My goal is to prevent the need for the Sort, Shuffle (read and query) and Exchange from the query plan.

To achieve this, I hash partition an intermediate dataset, bucketing by the id columns I'm going to groupBy later:

def example_df_bucketed(example_df):
    example_df = example_df.repartition(2,"id_1","id_2")
    output = Transforms.get_output()
    output_fs = output.filesystem()
    output.write_dataframe(example_df,bucket_cols=["id_1","id_2"], sort_by=["id_1","id_2"], bucket_count=2)

I try and run the same logic, this time with the bucketed dataset as the input

def df2(example_df_bucketed):
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=v - v.mean())

    return example_df_bucketed.groupby("id_1","id_2").applyInPandas(subtract_mean, schema="id_1 string, id_2 string, v double")

This results in the query plan not having a shuffle (hash partition), but it is still sorting.

enter image description here

Physical Plan:

Execute FoundrySaveDatasetCommand `ri.foundry.main.transaction.00000059-ec4c-26f7-a058-98be3f26018c@master`.`ri.foundry.main.dataset.02990b20-95f2-4605-9e7c-578ba071535d`, ErrorIfExists, [id_1, id_2, v], ComputedStatsServiceV2Blocking{_endpointChannelFactory=DialogueChannel@3127a629{channelName=dialogue-nonreloading-ComputedStatsServiceV2Blocking, delegate=com.palantir.dialogue.core.DialogueChannel$Builder$Lambda$713/0x0000000800807c40@70f51090}, runtime=com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime@6c67a62a}, com.palantir.foundry.spark.catalog.caching.CachingSchemaService@3db2ee77, com.palantir.foundry.spark.catalog.caching.CachingMetadataService@8086bb, com.palantir.foundry.spark.catalog.FoundrySparkResolver@7ebc329, com.palantir.foundry.spark.auth.DefaultFoundrySparkAuthSupplier@46d15de1
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      *(2) BasicStats `ri.foundry.main.transaction.00000059-ec4c-26f7-a058-98be3f26018c@master`.`ri.foundry.main.dataset.02990b20-95f2-4605-9e7c-578ba071535d`
      +- FlatMapGroupsInPandas [id_1#603, id_2#604], subtract_mean(id_1#603, id_2#604, v#605), [id_1#613, id_2#614, v#615]
         +- *(1) Sort [id_1#603 ASC NULLS FIRST, id_2#604 ASC NULLS FIRST], false, 0
            +- *(1) Project [id_1#603, id_2#604, id_1#603, id_2#604, v#605]
               +- *(1) ColumnarToRow
                  +- FileScan parquet !ri.foundry.main.transaction.00000059-ec22-2287-a0e3-5d9c48a39a83:ri.foundry.main.transaction.00000059-ec22-2287-a0e3-5d9c48a39a83@00000003-99fc-8b63-8d17-b7e45fface86:master.ri.foundry.main.dataset.bbada128-5538-4c7f-b5ba-6d16b15da5bf[id_1#603,id_2#604,v#605] Batched: true, BucketedScan: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[sparkfoundry://foundry/datasets/..., PartitionFilters: [], Partitioning: hashpartitioning(id_1#603, id_2#604, 2), PushedFilters: [], ReadSchema: struct<id_1:string,id_2:string,v:double>, ScanMode: RegularMode, SelectedBucketsCount: 2 out of 2

Since I'm already setting the sort_by when I bucket upstream, why is there still a sort in the query plan? Is there something I can do to avoid this sort?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文