pyspark-多种条件的集团和聚合

发布于 2025-01-20 17:58:35 字数 2406 浏览 0 评论 0原文

我想在几种情况下对数据进行分组和汇总。数据帧包含产品ID,故障代码,日期和故障类型。在这里,我准备了一个示例数据框架:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType
from datetime import datetime, date

data  = [("prod_001","fault_01",date(2020, 6, 4),"minor"),
         ("prod_001","fault_03",date(2020, 7, 2),"minor"),
         ("prod_001","fault_09",date(2020, 7, 14),"minor"),
         ("prod_001","fault_01",date(2020, 7, 14),"minor"),
         ("prod_001",None,date(2021, 4, 6),"major"),
         ("prod_001","fault_02",date(2021, 6, 22),"minor"),
         ("prod_001","fault_09",date(2021, 8, 1),"minor"),
         
         ("prod_002","fault_01",date(2020, 6, 13),"minor"),
         ("prod_002","fault_05",date(2020, 7, 11),"minor"),
         ("prod_002",None,date(2020, 8, 1),"major"),
         ("prod_002","fault_01",date(2021, 4, 15),"minor"),
         ("prod_002","fault_02",date(2021, 5, 11),"minor"),
         ("prod_002","fault_03",date(2021, 5, 13),"minor"),
  ]

schema = StructType([ \
    StructField("product_id",StringType(),True), \
    StructField("fault_code",StringType(),True), \
    StructField("date",DateType(),True), \
    StructField("fault_type", StringType(), True), \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
display(df)

通常,我想根据product_id进行分组,并以下日期的fult_codes(列表)进行以下汇总。这里的一些专业是列表的持续聚合,直到fult_type从次要变为主要。在这种情况下,主要标记的行将采用聚合的最后一个状态(请参见屏幕截图)。在一个product_ID中,列表的聚合应从新开始开始(以下是标记为次要的fult_code)。

请参见此处的目标输出

在其他一些帖子中,我找到了以下我已经尝试过的代码段。不幸的是,我还没有在所有条件下完成完整的聚合。

df.sort("product_id", "date").groupby("product_id", "date").agg(F.collect_list("fault_code"))

编辑:

使用window.partitionby()更接近一点对以下代码进行专业:

df_test = df.sort("product_id", "date").groupby("product_id", "date", "fault_type").agg(F.collect_list("fault_code")).withColumnRenamed('collect_list(fault_code)', 'fault_code_list')

window_function = Window.partitionBy("product_id").rangeBetween(Window.unboundedPreceding, Window.currentRow).orderBy("date")

df_test = df_test.withColumn("new_version_v2", F.collect_list("fault_code_list").over(Window.partitionBy("product_id").orderBy("date"))) \
                 .withColumn("new_version_v2", F.flatten("new_version_v2"))

有人知道该怎么做吗?

I want to group and aggregate data with several conditions. The dataframe contains a product id, fault codes, date and a fault type. Here, I prepared a sample dataframe:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType
from datetime import datetime, date

data  = [("prod_001","fault_01",date(2020, 6, 4),"minor"),
         ("prod_001","fault_03",date(2020, 7, 2),"minor"),
         ("prod_001","fault_09",date(2020, 7, 14),"minor"),
         ("prod_001","fault_01",date(2020, 7, 14),"minor"),
         ("prod_001",None,date(2021, 4, 6),"major"),
         ("prod_001","fault_02",date(2021, 6, 22),"minor"),
         ("prod_001","fault_09",date(2021, 8, 1),"minor"),
         
         ("prod_002","fault_01",date(2020, 6, 13),"minor"),
         ("prod_002","fault_05",date(2020, 7, 11),"minor"),
         ("prod_002",None,date(2020, 8, 1),"major"),
         ("prod_002","fault_01",date(2021, 4, 15),"minor"),
         ("prod_002","fault_02",date(2021, 5, 11),"minor"),
         ("prod_002","fault_03",date(2021, 5, 13),"minor"),
  ]

schema = StructType([ \
    StructField("product_id",StringType(),True), \
    StructField("fault_code",StringType(),True), \
    StructField("date",DateType(),True), \
    StructField("fault_type", StringType(), True), \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
display(df)

In general I would like to do a grouping based on the product_id and a following aggregation of the fault_codes (to lists) for the dates. Some specialties here are the continuing aggregation to a list until the fault_type changes from minor to major. In this case the major tagged row will adopt the last state of the aggregation (see screenshot). Within one product_id the aggregation to a list should then start from new (with the following fault_code which is flagged as minor).

see target output here

In some other posts I found the following code snippet which I already tried. Unfortunately I didnt make it to the full aggregation with all conditions yet.

df.sort("product_id", "date").groupby("product_id", "date").agg(F.collect_list("fault_code"))

Edit:

Got a little bit closer with Window.partitionBy() but still not able to start the collect_list() from new once the fault_type changes to major with the following code:

df_test = df.sort("product_id", "date").groupby("product_id", "date", "fault_type").agg(F.collect_list("fault_code")).withColumnRenamed('collect_list(fault_code)', 'fault_code_list')

window_function = Window.partitionBy("product_id").rangeBetween(Window.unboundedPreceding, Window.currentRow).orderBy("date")

df_test = df_test.withColumn("new_version_v2", F.collect_list("fault_code_list").over(Window.partitionBy("product_id").orderBy("date"))) \
                 .withColumn("new_version_v2", F.flatten("new_version_v2"))

Does someone know how to do that?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

感情洁癖 2025-01-27 17:58:35

您的编辑已结束。这并不那么简单,我只是想出了一个可行但不那么简洁的解决方案。

lagw = Window.partitionBy('product_id').orderBy('date')
grpw = Window.partitionBy(['product_id', 'grp']).orderBy('date').rowsBetween(Window.unboundedPreceding, 0)

df = (df.withColumn('grp', F.sum(
        (F.lag('fault_type').over(lagw).isNull()
        | (F.lag('fault_type').over(lagw) == 'major')
     ).cast('int')).over(lagw))
     .withColumn('fault_code', F.collect_list('fault_code').over(grpw)))

df.orderBy(['product_id', 'grp']).show()
# +----------+----------------------------------------+----------+----------+---+
# |product_id|                              fault_code|      date|fault_type|grp|
# +----------+----------------------------------------+----------+----------+---+
# |  prod_001|[fault_01]                              |2020-06-04|     minor|  1|
# |  prod_001|[fault_01, fault_03]                    |2020-07-02|     minor|  1|
# |  prod_001|[fault_01, fault_03, fault_09]          |2020-07-14|     minor|  1|
# |  prod_001|[fault_01, fault_03, fault_09, fault_01]|2020-07-14|     minor|  1|
# |  prod_001|[fault_01, fault_03, fault_09, fault_01]|2021-04-06|     major|  1|
# |  prod_001|[fault_02]                              |2021-06-22|     minor|  2|
# |  prod_001|[fault_02, fault_09]                    |2021-08-01|     minor|  2|
# |  prod_002|[fault_01]                              |2020-06-13|     minor|  1|
# |  prod_002|[fault_01, fault_02]                    |2020-07-11|     minor|  1|
...

说明:

首先,我创建 grp 列来对连续的“次要”+后面的“主要”进行分类。我使用 sumlag 来查看前一行是否为“主要”,然后递增,否则,我保留与前一行相同的值。

# If cond is True, sum 1, if False, sum 0.
F.sum((cond).cast('int'))
df.orderBy(['product_id', 'date']).select('product_id', 'date', 'fault_type', 'grp').show()

+----------+----------+----------+---+
|product_id|      date|fault_type|grp|
+----------+----------+----------+---+
|  prod_001|2020-06-04|     minor|  1|
|  prod_001|2020-07-02|     minor|  1|
|  prod_001|2020-07-14|     minor|  1|
|  prod_001|2020-07-14|     minor|  1|
|  prod_001|2021-04-06|     major|  1|
|  prod_001|2021-06-22|     minor|  2|
|  prod_001|2021-08-01|     minor|  2|
|  prod_002|2020-06-13|     minor|  1|
|  prod_002|2020-07-11|     minor|  1|
...

生成此 grp 后,我可以按 product_idgrp 进行分区以应用 collect_list

Your edit is close. This is not as simple and I only come up with a solution that works but not so neat.

lagw = Window.partitionBy('product_id').orderBy('date')
grpw = Window.partitionBy(['product_id', 'grp']).orderBy('date').rowsBetween(Window.unboundedPreceding, 0)

df = (df.withColumn('grp', F.sum(
        (F.lag('fault_type').over(lagw).isNull()
        | (F.lag('fault_type').over(lagw) == 'major')
     ).cast('int')).over(lagw))
     .withColumn('fault_code', F.collect_list('fault_code').over(grpw)))

df.orderBy(['product_id', 'grp']).show()
# +----------+----------------------------------------+----------+----------+---+
# |product_id|                              fault_code|      date|fault_type|grp|
# +----------+----------------------------------------+----------+----------+---+
# |  prod_001|[fault_01]                              |2020-06-04|     minor|  1|
# |  prod_001|[fault_01, fault_03]                    |2020-07-02|     minor|  1|
# |  prod_001|[fault_01, fault_03, fault_09]          |2020-07-14|     minor|  1|
# |  prod_001|[fault_01, fault_03, fault_09, fault_01]|2020-07-14|     minor|  1|
# |  prod_001|[fault_01, fault_03, fault_09, fault_01]|2021-04-06|     major|  1|
# |  prod_001|[fault_02]                              |2021-06-22|     minor|  2|
# |  prod_001|[fault_02, fault_09]                    |2021-08-01|     minor|  2|
# |  prod_002|[fault_01]                              |2020-06-13|     minor|  1|
# |  prod_002|[fault_01, fault_02]                    |2020-07-11|     minor|  1|
...

Explanation:

First I create grp column to categorize the consecutive "minor" + following "major". I use sum and lag to see if the previous row was "major", then I increment, otherwise, I keep the same value as the previous row.

# If cond is True, sum 1, if False, sum 0.
F.sum((cond).cast('int'))
df.orderBy(['product_id', 'date']).select('product_id', 'date', 'fault_type', 'grp').show()

+----------+----------+----------+---+
|product_id|      date|fault_type|grp|
+----------+----------+----------+---+
|  prod_001|2020-06-04|     minor|  1|
|  prod_001|2020-07-02|     minor|  1|
|  prod_001|2020-07-14|     minor|  1|
|  prod_001|2020-07-14|     minor|  1|
|  prod_001|2021-04-06|     major|  1|
|  prod_001|2021-06-22|     minor|  2|
|  prod_001|2021-08-01|     minor|  2|
|  prod_002|2020-06-13|     minor|  1|
|  prod_002|2020-07-11|     minor|  1|
...

Once this grp is generated, I can partition by product_id and grp to apply collect_list.

友谊不毕业 2025-01-27 17:58:35

一种可能的方法是将pandas udf与 applyinpandas

定义“正常” python函数
  • 输入是pandas dataframe,输出是另一个数据框架。
  • 数据框的大小无关紧要,
def grp(df):
    df['a'] = 'AAA'
    df = df[df['fault_code'] == 'fault_01']
    return df[['product_id', 'a']]
用实际的pandas数据框架测试此功能。
  • 唯一要记住的是,此数据框只是您实际数据框的一个子集
grp(df.where('product_id == "prod_001"').toPandas())

    product_id  a
0   prod_001    AAA
3   prod_001    AAA
,将此功能应用于Spark DataFrame,用applioninpandas
(df
    .groupBy('product_id')
    .applyInPandas(grp, schema)
    .show()
)

                                                                                
+----------+---+
|product_id|  a|
+----------+---+
|  prod_001|AAA|
|  prod_001|AAA|
|  prod_002|AAA|
|  prod_002|AAA|
|  prod_002|AAA|
|  prod_002|AAA|
|  prod_002|AAA|
+----------+---+

One possible approach is using Pandas UDF with applyInPandas.

Define a "normal" Python function
  • Input is a Pandas dataframe and output is another dataframe.
  • The dataframe's size doesn't matter
def grp(df):
    df['a'] = 'AAA'
    df = df[df['fault_code'] == 'fault_01']
    return df[['product_id', 'a']]
Test this function with actual Pandas dataframe
  • The only thing to remember is this dataframe is just a subset of your actual dataframe
grp(df.where('product_id == "prod_001"').toPandas())

    product_id  a
0   prod_001    AAA
3   prod_001    AAA
Apply this function into Spark dataframe with applyInPandas
(df
    .groupBy('product_id')
    .applyInPandas(grp, schema)
    .show()
)

                                                                                
+----------+---+
|product_id|  a|
+----------+---+
|  prod_001|AAA|
|  prod_001|AAA|
|  prod_002|AAA|
|  prod_002|AAA|
|  prod_002|AAA|
|  prod_002|AAA|
|  prod_002|AAA|
+----------+---+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文