加入两个dataframes之后,在最终数据框架上有条件格式

发布于 2025-01-17 22:47:31 字数 1895 浏览 0 评论 0原文

PySpark DataFrame 场景:

  1. 有一个名为 DF 的 DataFrame。 DF 的两个主要列是 IDDate
  2. 每个 ID 平均有 40 多个唯一的 Date(非连续日期)。
  3. 现在,有第二个名为 DF_date 的 DataFrame,其中有一列名为 DateDates 中的日期范围介于 DF 中“日期”的最大值和最小值之间。
  4. 现在,目标是用每个唯一“ID”的连续开始日期和结束日期填充 DF(缺少的中断日期用 DF_date< 之间的 left join 填充) /code> 和 DF

预期

+-------------+-------------+----------------+
|         Date|          Val|              ID|
+-------------+-------------+----------------+
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
+-------------+-------------+----------------+ 

最终

+-------------+
|         Date|
+-------------+
|   2021-07-01|
|   2021-07-02|
|   2021-07-03|
|   2021-07-04|
|   2021-07-05|
|   2021-07-06|
+-------------+

输出:

+-------------+-------------+----------------+
|         Date|          Val|              ID|
+-------------+-------------+----------------+
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-02|     81119.73|         Ax3838J|
|   2021-07-03|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-03|     81249.76|         Bz3838J|
|   2021-07-04|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
+-------------+-------------+----------------+ 

PySpark DataFrame Scenario:

  1. There is a DataFrame called DF. Two main columns of DF are ID and Date.
  2. Each ID has on average 40+ unique Dates (not continuous dates).
  3. Now, there is second DataFrame called DF_date which has one column named Date. The dates in Dates range between maximum and minimum of 'Date' from DF.
  4. Now, the goal is to fill DF with the continuous Start and End date of each unique 'ID' (missing discontinued dates are filled with left join between DF_date and DF.

DF

+-------------+-------------+----------------+
|         Date|          Val|              ID|
+-------------+-------------+----------------+
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
+-------------+-------------+----------------+ 

DF_date

+-------------+
|         Date|
+-------------+
|   2021-07-01|
|   2021-07-02|
|   2021-07-03|
|   2021-07-04|
|   2021-07-05|
|   2021-07-06|
+-------------+

Expected Final Output:

+-------------+-------------+----------------+
|         Date|          Val|              ID|
+-------------+-------------+----------------+
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-02|     81119.73|         Ax3838J|
|   2021-07-03|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-03|     81249.76|         Bz3838J|
|   2021-07-04|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
+-------------+-------------+----------------+ 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

岁月打碎记忆 2025-01-24 22:47:31

你的问题没有意义。为什么要有一个包含开始日期和结束日期的 DF_date 数据框,使用它们填充日期,然后诉诸于使用 DF 开始日期和结束日期。为什么不直接使用每组的 DF 最短和最长日期来填充缺失的日期。

无论如何,这就是您根据 DF_Date 填写缺失日期的方法

按照您的评论,请参阅我的编辑

  new = (DF.groupby('ID')
       .agg(to_date(first('Date')).alias('min_date')#minimum date per group
       ,to_date(last('Date')).alias('max_date')#max date per group
       ,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
       
       #Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
       .selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
       .drop('Date').withColumnRenamed('2','Date')
       #Forward fill the Val column
      .withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
       
      ).show()

+-------+--------+----------+
|     ID|     Val|      Date|
+-------+--------+----------+
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
+-------+--------+----------+

Your question doesn't make sense. Why have a DF_date dataframe with start and end dates, use them to fill in date and then resort to using the DF start and end date. Why not just fill in missing dates by using DF min and max date for each group.

Anyway, this is how you fill in missing dates based on DF_Date

Following your comments, see my edits

  new = (DF.groupby('ID')
       .agg(to_date(first('Date')).alias('min_date')#minimum date per group
       ,to_date(last('Date')).alias('max_date')#max date per group
       ,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
       
       #Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
       .selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
       .drop('Date').withColumnRenamed('2','Date')
       #Forward fill the Val column
      .withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
       
      ).show()

+-------+--------+----------+
|     ID|     Val|      Date|
+-------+--------+----------+
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
+-------+--------+----------+
我的黑色迷你裙 2025-01-24 22:47:31

在上面的问题中,我后来意识到@wwnde建议,无需为日期创建单独的DF。

下面提供的代码也达到了目的 -

# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')


# the ranges of dates between the DATE value in the current row and the following row 

next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") + F.expr("interval 1 day"))

end_date_range = next_date - F.expr("interval 1 day")

# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.

final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))\
  .withColumn("DATE", F.explode("Ranges"))\
  .withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))\
  .drop("Ranges")

display(final_result)

In the above question, I later realised as suggested @wwnde there is no need to create a separate DF for Dates.

Code provided below serves the purpose too -

# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')


# the ranges of dates between the DATE value in the current row and the following row 

next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") + F.expr("interval 1 day"))

end_date_range = next_date - F.expr("interval 1 day")

# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.

final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))\
  .withColumn("DATE", F.explode("Ranges"))\
  .withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))\
  .drop("Ranges")

display(final_result)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文