Pyspark:如何对倾斜聚合使用盐化技术

发布于 2025-01-09 12:41:27 字数 958 浏览 3 评论 0原文

如何在 Pyspark 中使用盐化技术进行倾斜聚合。

假设我们有倾斜的数据,如下所示,如何创建盐列并在聚合中使用它。

城邦3,00,000数量
Lachung锡金3,000
Rangpo锡金50,000
甘托克锡金班加罗尔
卡纳塔克邦2,50,00,000孟买
马哈拉施特拉邦2,90,00,000

How to use salting technique for Skewed Aggregation in Pyspark.

Say we have Skewed data like below how to create salting column and use it in aggregation.

citystatecount
LachungSikkim3,000
RangpoSikkim50,000
GangtokSikkim3,00,000
BangaloreKarnataka2,50,00,000
MumbaiMaharashtra2,90,00,000

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

随风而去 2025-01-16 12:41:27

要对倾斜数据使用加盐技术,我们需要创建一个名为“盐”的列。生成一个范围从 0 到 (spark.sql.shuffle.partitions - 1) 的随机编号。

表应如下所示,其中“salt”列的值从 0 到 199(在本例中分区大小为 200)。现在您可以对“城市”、“州”、“盐”使用 groupBy。

城邦锡金盐 拉
锡金151
拉冲锡金102
锡金16
朗波锡金5
朗波锡金19
托克朗波锡金16
托克锡金119
锡金55
16102
甘托克朗波锡金
托克锡金10
班加罗尔卡纳塔克邦19
MumbaiMaharashtra0
BangaloreKarnataka199
MumbaiMaharashtra190

代码:

from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import (
    StructType, StructField, IntegerType
)

salval = f.round(f.rand() * int(spark.conf.get("spark.sql.shuffle.partitions")) -1 )

record_df.withColumn("salt", f.lit(salval).cast(IntegerType()))\
    .groupBy("city", "state", "salt")\
    .agg(
      f.count("city")
    )\
    .drop("salt")

输出

城邦计数Mumbai
LachungSikkim3,000
RangpoSikkim50,000
GangtokSikkim3,00,000
BangaloreKarnataka2,50,00,000
Maharashtra2,90,00,000

To use the salting technique on skewed data, we need to create a column say "salt". Generate a random no with a range from 0 to (spark.sql.shuffle.partitions - 1).

Table should look like below, where "salt" column will have value from 0 to 199 (as in this case partitions size is 200). Now you can use groupBy on "city", "state", "salt".

citystatesalt
LachungSikkim151
LachungSikkim102
LachungSikkim16
RangpoSikkim5
RangpoSikkim19
RangpoSikkim16
RangpoSikkim102
GangtokSikkim55
GangtokSikkim119
GangtokSikkim16
GangtokSikkim10
BangaloreKarnataka19
MumbaiMaharashtra0
BangaloreKarnataka199
MumbaiMaharashtra190

code:

from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import (
    StructType, StructField, IntegerType
)

salval = f.round(f.rand() * int(spark.conf.get("spark.sql.shuffle.partitions")) -1 )

record_df.withColumn("salt", f.lit(salval).cast(IntegerType()))\
    .groupBy("city", "state", "salt")\
    .agg(
      f.count("city")
    )\
    .drop("salt")

output:

citystatecount
LachungSikkim3,000
RangpoSikkim50,000
GangtokSikkim3,00,000
BangaloreKarnataka2,50,00,000
MumbaiMaharashtra2,90,00,000
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文