pyspark-多种条件的集团和聚合
我想在几种情况下对数据进行分组和汇总。数据帧包含产品ID,故障代码,日期和故障类型。在这里,我准备了一个示例数据框架:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType
from datetime import datetime, date
data = [("prod_001","fault_01",date(2020, 6, 4),"minor"),
("prod_001","fault_03",date(2020, 7, 2),"minor"),
("prod_001","fault_09",date(2020, 7, 14),"minor"),
("prod_001","fault_01",date(2020, 7, 14),"minor"),
("prod_001",None,date(2021, 4, 6),"major"),
("prod_001","fault_02",date(2021, 6, 22),"minor"),
("prod_001","fault_09",date(2021, 8, 1),"minor"),
("prod_002","fault_01",date(2020, 6, 13),"minor"),
("prod_002","fault_05",date(2020, 7, 11),"minor"),
("prod_002",None,date(2020, 8, 1),"major"),
("prod_002","fault_01",date(2021, 4, 15),"minor"),
("prod_002","fault_02",date(2021, 5, 11),"minor"),
("prod_002","fault_03",date(2021, 5, 13),"minor"),
]
schema = StructType([ \
StructField("product_id",StringType(),True), \
StructField("fault_code",StringType(),True), \
StructField("date",DateType(),True), \
StructField("fault_type", StringType(), True), \
])
df = spark.createDataFrame(data=data,schema=schema)
display(df)
通常,我想根据product_id进行分组,并以下日期的fult_codes(列表)进行以下汇总。这里的一些专业是列表的持续聚合,直到fult_type从次要变为主要。在这种情况下,主要标记的行将采用聚合的最后一个状态(请参见屏幕截图)。在一个product_ID中,列表的聚合应从新开始开始(以下是标记为次要的fult_code)。
在其他一些帖子中,我找到了以下我已经尝试过的代码段。不幸的是,我还没有在所有条件下完成完整的聚合。
df.sort("product_id", "date").groupby("product_id", "date").agg(F.collect_list("fault_code"))
编辑:
使用window.partitionby()
更接近一点对以下代码进行专业:
df_test = df.sort("product_id", "date").groupby("product_id", "date", "fault_type").agg(F.collect_list("fault_code")).withColumnRenamed('collect_list(fault_code)', 'fault_code_list')
window_function = Window.partitionBy("product_id").rangeBetween(Window.unboundedPreceding, Window.currentRow).orderBy("date")
df_test = df_test.withColumn("new_version_v2", F.collect_list("fault_code_list").over(Window.partitionBy("product_id").orderBy("date"))) \
.withColumn("new_version_v2", F.flatten("new_version_v2"))
有人知道该怎么做吗?
I want to group and aggregate data with several conditions. The dataframe contains a product id, fault codes, date and a fault type. Here, I prepared a sample dataframe:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType
from datetime import datetime, date
data = [("prod_001","fault_01",date(2020, 6, 4),"minor"),
("prod_001","fault_03",date(2020, 7, 2),"minor"),
("prod_001","fault_09",date(2020, 7, 14),"minor"),
("prod_001","fault_01",date(2020, 7, 14),"minor"),
("prod_001",None,date(2021, 4, 6),"major"),
("prod_001","fault_02",date(2021, 6, 22),"minor"),
("prod_001","fault_09",date(2021, 8, 1),"minor"),
("prod_002","fault_01",date(2020, 6, 13),"minor"),
("prod_002","fault_05",date(2020, 7, 11),"minor"),
("prod_002",None,date(2020, 8, 1),"major"),
("prod_002","fault_01",date(2021, 4, 15),"minor"),
("prod_002","fault_02",date(2021, 5, 11),"minor"),
("prod_002","fault_03",date(2021, 5, 13),"minor"),
]
schema = StructType([ \
StructField("product_id",StringType(),True), \
StructField("fault_code",StringType(),True), \
StructField("date",DateType(),True), \
StructField("fault_type", StringType(), True), \
])
df = spark.createDataFrame(data=data,schema=schema)
display(df)
In general I would like to do a grouping based on the product_id and a following aggregation of the fault_codes (to lists) for the dates. Some specialties here are the continuing aggregation to a list until the fault_type changes from minor to major. In this case the major tagged row will adopt the last state of the aggregation (see screenshot). Within one product_id the aggregation to a list should then start from new (with the following fault_code which is flagged as minor).
In some other posts I found the following code snippet which I already tried. Unfortunately I didnt make it to the full aggregation with all conditions yet.
df.sort("product_id", "date").groupby("product_id", "date").agg(F.collect_list("fault_code"))
Edit:
Got a little bit closer with Window.partitionBy()
but still not able to start the collect_list()
from new once the fault_type changes to major with the following code:
df_test = df.sort("product_id", "date").groupby("product_id", "date", "fault_type").agg(F.collect_list("fault_code")).withColumnRenamed('collect_list(fault_code)', 'fault_code_list')
window_function = Window.partitionBy("product_id").rangeBetween(Window.unboundedPreceding, Window.currentRow).orderBy("date")
df_test = df_test.withColumn("new_version_v2", F.collect_list("fault_code_list").over(Window.partitionBy("product_id").orderBy("date"))) \
.withColumn("new_version_v2", F.flatten("new_version_v2"))
Does someone know how to do that?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您的编辑已结束。这并不那么简单,我只是想出了一个可行但不那么简洁的解决方案。
说明:
首先,我创建
grp
列来对连续的“次要”+后面的“主要”进行分类。我使用sum
和lag
来查看前一行是否为“主要”,然后递增,否则,我保留与前一行相同的值。生成此
grp
后,我可以按product_id
和grp
进行分区以应用collect_list
。Your edit is close. This is not as simple and I only come up with a solution that works but not so neat.
Explanation:
First I create
grp
column to categorize the consecutive "minor" + following "major". I usesum
andlag
to see if the previous row was "major", then I increment, otherwise, I keep the same value as the previous row.Once this
grp
is generated, I can partition byproduct_id
andgrp
to applycollect_list
.一种可能的方法是将pandas udf与
applyinpandas
。定义“正常” python函数
用实际的pandas数据框架测试此功能。
,将此功能应用于Spark DataFrame,用
applioninpandas
One possible approach is using Pandas UDF with
applyInPandas
.Define a "normal" Python function
Test this function with actual Pandas dataframe
Apply this function into Spark dataframe with
applyInPandas