Pyspark 中基于时间的窗口函数

发布于 2025-01-17 16:34:29 字数 2689 浏览 3 评论 0原文

我的目标是计算另一列,保持与原始 DataFrame 相同的行数,我可以在其中显示每个用户过去 30 天的平均余额。

我想可以使用窗口函数、按用户分区并以某种方式限制当前日期和 30 天前之间的行来完成,但我不知道如何在 PySpark 中实现它。

我有以下 Spark DataFrame:

userId日期余额
A09/06/2020100
A03/07/2020200
A05/08/2020600
A30/08/20201000
A15/09/2020500
B03/01/ 2020100B
05/04/2020200
B29/04/2020600
B01/05/20201600

我想要的输出 DataFrame 是:

userId日期余额mean_last_30days_balance
A09/06/2020100100
A03/07/2020200150
A05/08/2020600600
A30/08/20201000800
A15/09/2020500750
B03/01/2020100100
B05/04/2020200200
B29/04/2020600400
B2020年1月5日1600800
from datetime import datetime
from pyspark.sql import types as T

data = [("A",datetime.strptime("09/06/2020",'%d/%m/%Y'),100),
        ("A",datetime.strptime("03/07/2020",'%d/%m/%Y'),200),
        ("A",datetime.strptime("05/08/2020",'%d/%m/%Y'),600),
        ("A",datetime.strptime("30/08/2020",'%d/%m/%Y'),1000),
        ("A",datetime.strptime("15/09/2020",'%d/%m/%Y'),500),
        ("B",datetime.strptime("03/01/2020",'%d/%m/%Y'),100),
        ("B",datetime.strptime("05/04/2020",'%d/%m/%Y'),200),
        ("B",datetime.strptime("29/04/2020",'%d/%m/%Y'),600),
        ("B",datetime.strptime("01/05/2020",'%d/%m/%Y'),1600)]

schema = T.StructType([T.StructField("userId",T.StringType(),True),
                       T.StructField("date",T.DateType(),True), 
                       T.StructField("balance",T.StringType(),True)
                      ])
 
sdf_prueba = spark.createDataFrame(data=data,schema=schema)
sdf_prueba.printSchema()
sdf_prueba.orderBy(F.col('userId').asc(),F.col('date').asc()).show(truncate=False)

My goal is to calculate another column, keeping the same number of rows as the original DataFrame, where I can show the mean balance for each user for the last 30 days.

I guess it can be done using Window Functions, partitioning by user and somehow limiting the rows which are between the current date and 30 days before, but I don't know how to implement it in PySpark.

I have the following Spark DataFrame:

userIddatebalance
A09/06/2020100
A03/07/2020200
A05/08/2020600
A30/08/20201000
A15/09/2020500
B03/01/2020100
B05/04/2020200
B29/04/2020600
B01/05/20201600

My desired output DataFrame would be:

userIddatebalancemean_last_30days_balance
A09/06/2020100100
A03/07/2020200150
A05/08/2020600600
A30/08/20201000800
A15/09/2020500750
B03/01/2020100100
B05/04/2020200200
B29/04/2020600400
B01/05/20201600800
from datetime import datetime
from pyspark.sql import types as T

data = [("A",datetime.strptime("09/06/2020",'%d/%m/%Y'),100),
        ("A",datetime.strptime("03/07/2020",'%d/%m/%Y'),200),
        ("A",datetime.strptime("05/08/2020",'%d/%m/%Y'),600),
        ("A",datetime.strptime("30/08/2020",'%d/%m/%Y'),1000),
        ("A",datetime.strptime("15/09/2020",'%d/%m/%Y'),500),
        ("B",datetime.strptime("03/01/2020",'%d/%m/%Y'),100),
        ("B",datetime.strptime("05/04/2020",'%d/%m/%Y'),200),
        ("B",datetime.strptime("29/04/2020",'%d/%m/%Y'),600),
        ("B",datetime.strptime("01/05/2020",'%d/%m/%Y'),1600)]

schema = T.StructType([T.StructField("userId",T.StringType(),True),
                       T.StructField("date",T.DateType(),True), 
                       T.StructField("balance",T.StringType(),True)
                      ])
 
sdf_prueba = spark.createDataFrame(data=data,schema=schema)
sdf_prueba.printSchema()
sdf_prueba.orderBy(F.col('userId').asc(),F.col('date').asc()).show(truncate=False)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

情绪失控 2025-01-24 16:34:29

您可以使用RANGE BETWEEN关键字:

sdf_prueba.createOrReplaceTempView("table1")

spark.sql(
    """SELECT *, mean(balance) OVER (
        PARTITION BY userid 
        ORDER BY CAST(date AS timestamp)  
        RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM table1""").show()


+------+----------+-------+-----+
|userId|      date|balance| mean|
+------+----------+-------+-----+
|     A|2020-06-09|    100|100.0|
|     A|2020-07-03|    200|150.0|
|     A|2020-08-05|    600|600.0|
|     A|2020-08-30|   1000|800.0|
|     A|2020-09-15|    500|750.0|
|     B|2020-01-03|    100|100.0|
|     B|2020-04-05|    200|200.0|
|     B|2020-04-29|    600|400.0|
|     B|2020-05-01|   1600|800.0|
+------+----------+-------+-----+

如果您想使用pyspark API,您需要
将天转换为 Unix 秒以便使用 rangeBetween

one_month_in_seconds = 2629743 # ?
w = (
    Window.partitionBy("userid")
    .orderBy(unix_timestamp(col("date").cast("timestamp")))
    .rangeBetween(-one_month_in_seconds, Window.currentRow)
)

sdf_prueba.select(col("*"), mean("balance").over(w).alias("mean")).show()

+------+----------+-------+-----+
|userId|      date|balance| mean|
+------+----------+-------+-----+
|     A|2020-06-09|    100|100.0|
|     A|2020-07-03|    200|150.0|
|     A|2020-08-05|    600|600.0|
|     A|2020-08-30|   1000|800.0|
|     A|2020-09-15|    500|750.0|
|     B|2020-01-03|    100|100.0|
|     B|2020-04-05|    200|200.0|
|     B|2020-04-29|    600|400.0|
|     B|2020-05-01|   1600|800.0|
+------+----------+-------+-----+

You can use the RANGE BETWEEN keyword:

sdf_prueba.createOrReplaceTempView("table1")

spark.sql(
    """SELECT *, mean(balance) OVER (
        PARTITION BY userid 
        ORDER BY CAST(date AS timestamp)  
        RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM table1""").show()


+------+----------+-------+-----+
|userId|      date|balance| mean|
+------+----------+-------+-----+
|     A|2020-06-09|    100|100.0|
|     A|2020-07-03|    200|150.0|
|     A|2020-08-05|    600|600.0|
|     A|2020-08-30|   1000|800.0|
|     A|2020-09-15|    500|750.0|
|     B|2020-01-03|    100|100.0|
|     B|2020-04-05|    200|200.0|
|     B|2020-04-29|    600|400.0|
|     B|2020-05-01|   1600|800.0|
+------+----------+-------+-----+

If you want to use the pyspark API, you need to
convert days to unix seconds in order to use rangeBetween

one_month_in_seconds = 2629743 # ?
w = (
    Window.partitionBy("userid")
    .orderBy(unix_timestamp(col("date").cast("timestamp")))
    .rangeBetween(-one_month_in_seconds, Window.currentRow)
)

sdf_prueba.select(col("*"), mean("balance").over(w).alias("mean")).show()

+------+----------+-------+-----+
|userId|      date|balance| mean|
+------+----------+-------+-----+
|     A|2020-06-09|    100|100.0|
|     A|2020-07-03|    200|150.0|
|     A|2020-08-05|    600|600.0|
|     A|2020-08-30|   1000|800.0|
|     A|2020-09-15|    500|750.0|
|     B|2020-01-03|    100|100.0|
|     B|2020-04-05|    200|200.0|
|     B|2020-04-29|    600|400.0|
|     B|2020-05-01|   1600|800.0|
+------+----------+-------+-----+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文