使用 PySpark 的窗口函数

发布于 2025-01-15 05:23:23 字数 2082 浏览 4 评论 0原文

我有一个 PySpark Dataframe,我的目标是创建一个 Flag 列,其值取决于 Amount 列的值。 基本上,对于每个组,我想知道在前三个月中是否存在金额大于 0,以及是否存在在这种情况下,所有组的 Flag 列的值将为 1,否则该值将为 0。

我将提供一个示例来更好地说明。

初始 PySpark 数据框:

金额
A10
A20
A335
A40
A50
B10
B20
C10
C20
C30
C413
D10
D224
D30

Final PySpark 数据框:

月份金额标志
A101
A201
A3351
A401
A501
B100
B200
C100
C200
C300
C4130
D101
D2241
D301

基本上,我想要什么是针对每组,将前 3 个月的金额相加。如果该总和大于 0,则该组的所有元素的标志为 1,否则为 0。

I have a PySpark Dataframe and my goal is to create a Flag column whose value depends on the value of the Amount column.
Basically, for each Group, I want to know if in any of the first three months, there is an amount greater than 0 and if that is the case, the value of the Flag column will be 1 for all the group, otherwise the value will be 0.

I will include an example to clarify a bit better.

Initial PySpark Dataframe:

GroupMonthAmount
A10
A20
A335
A40
A50
B10
B20
C10
C20
C30
C413
D10
D224
D30

Final PySpark Dataframe:

GroupMonthAmountFlag
A101
A201
A3351
A401
A501
B100
B200
C100
C200
C300
C4130
D101
D2241
D301

Basically, what I want is for each group, to sum the amount of the first 3 months. If that sum is greater than 0, the flag is 1 for all the elements of the group, and otherwise is 0.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

孤单情人 2025-01-22 05:23:23

您可以通过应用Window 函数来创建flag 列。创建一个伪列,如果满足条件,则该伪列变为 1,然后最后对伪列求和,如果它大于 0,则至少有一次满足条件的行并设置 flag至 1。

from pyspark.sql import functions as F
from pyspark.sql import Window as W

data = [("A", 1, 0, ), 
("A", 2, 0, ), 
("A", 3, 35, ), 
("A", 4, 0, ), 
("A", 5, 0, ), 
("B", 1, 0, ), 
("B", 2, 0, ), 
("C", 1, 0, ), 
("C", 2, 0, ), 
("C", 3, 0, ), 
("C", 4, 13, ), 
("D", 1, 0, ), 
("D", 2, 24, ), 
("D", 3, 0, ), ]

df = spark.createDataFrame(data, ("Group", "Month", "Amount", ))

ws = W.partitionBy("Group").orderBy("Month").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

criteria = F.when((F.col("Month") < 4) & (F.col("Amount") > 0), F.lit(1)).otherwise(F.lit(0))

(df.withColumn("flag", F.when(F.sum(criteria).over(ws) > 0, F.lit(1)).otherwise(F.lit(0)))
).show()

"""
+-----+-----+------+----+
|Group|Month|Amount|flag|
+-----+-----+------+----+
|    A|    1|     0|   1|
|    A|    2|     0|   1|
|    A|    3|    35|   1|
|    A|    4|     0|   1|
|    A|    5|     0|   1|
|    B|    1|     0|   0|
|    B|    2|     0|   0|
|    C|    1|     0|   0|
|    C|    2|     0|   0|
|    C|    3|     0|   0|
|    C|    4|    13|   0|
|    D|    1|     0|   1|
|    D|    2|    24|   1|
|    D|    3|     0|   1|
+-----+-----+------+----+
"""

You can create the flag column by applying a Window function. Create a psuedo-column which becomes 1 if the criteria is met and then finally sum over the psuedo-column and if it's greater than 0, then there was atleast once row that met the criteria and set the flag to 1.

from pyspark.sql import functions as F
from pyspark.sql import Window as W

data = [("A", 1, 0, ), 
("A", 2, 0, ), 
("A", 3, 35, ), 
("A", 4, 0, ), 
("A", 5, 0, ), 
("B", 1, 0, ), 
("B", 2, 0, ), 
("C", 1, 0, ), 
("C", 2, 0, ), 
("C", 3, 0, ), 
("C", 4, 13, ), 
("D", 1, 0, ), 
("D", 2, 24, ), 
("D", 3, 0, ), ]

df = spark.createDataFrame(data, ("Group", "Month", "Amount", ))

ws = W.partitionBy("Group").orderBy("Month").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

criteria = F.when((F.col("Month") < 4) & (F.col("Amount") > 0), F.lit(1)).otherwise(F.lit(0))

(df.withColumn("flag", F.when(F.sum(criteria).over(ws) > 0, F.lit(1)).otherwise(F.lit(0)))
).show()

"""
+-----+-----+------+----+
|Group|Month|Amount|flag|
+-----+-----+------+----+
|    A|    1|     0|   1|
|    A|    2|     0|   1|
|    A|    3|    35|   1|
|    A|    4|     0|   1|
|    A|    5|     0|   1|
|    B|    1|     0|   0|
|    B|    2|     0|   0|
|    C|    1|     0|   0|
|    C|    2|     0|   0|
|    C|    3|     0|   0|
|    C|    4|    13|   0|
|    D|    1|     0|   1|
|    D|    2|    24|   1|
|    D|    3|     0|   1|
+-----+-----+------+----+
"""
半世蒼涼 2025-01-22 05:23:23

您可以将窗口函数与 countwhen 结合使用。

w = Window.partitionBy('Group')
df = df.withColumn('Flag', F.count(
        F.when((F.col('Month') < 4) & (F.col('Amount') > 0), True)).over(w))
     .withColumn('Flag', F.when(F.col('Flag') > 0, 1).otherwise(0))

You can use Window function with count and when.

w = Window.partitionBy('Group')
df = df.withColumn('Flag', F.count(
        F.when((F.col('Month') < 4) & (F.col('Amount') > 0), True)).over(w))
     .withColumn('Flag', F.when(F.col('Flag') > 0, 1).otherwise(0))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文