左连接条件,并使用Spark Python / Pyspark聚合最大

发布于 2025-02-07 06:43:46 字数 1791 浏览 1 评论 0原文

我拥有的: 2个大量火花数据框架,但这里有一些示例

  • 数据框架:
IDIGOPENDATE
P11110013/04/2022
P22210116/04/2022
P33310220/04/2022
  • DATAFRAME B DATA FRAME B DATA FRAME B :
IG服务DT_Service
100A12/04/2022
100B13/04/2022
100B14/04/2022
101A15/04/2022
101A16/04/2022
101B17/04/2022
101B18/ 04/2022
102A19/04/2022
102b20/04/2022

我想要的是:我想使用键使用键'join of dataframe a dataframe a dataframe a两个列“服务”和“ dt_service” IG',但也具有相应日期的“服务”的最大值。因此,我需要最新的“服务”,并在DataFrame A中的每一行的相应日期。这是我期望的结果:

IDIGOPENDATE服务DT_Service
P11110013/04/2022B14/04/2022
P22210116/04 /2022B18/04/2022
P33310220/04/2022B20/04/2022

工具:Spark 2.2带Pyspark带有Pyspark,因为我正在研究Hadoop,

谢谢您的帮助

What I have : 2 massive spark dataframes, but here are some samples

  • Dataframe A:
IDIGOpenDate
P11110013/04/2022
P22210116/04/2022
P33310220/04/2022
  • Dataframe B:
IGServiceDt_Service
100A12/04/2022
100B13/04/2022
100B14/04/2022
101A15/04/2022
101A16/04/2022
101B17/04/2022
101B18/04/2022
102A19/04/2022
102B20/04/2022

What I want: I want to left join on dataframe A the two columns 'Service' and 'Dt_Service' using the key 'IG' but also having the Max value of 'Service' with the corresponding date. So I need the most recent 'Service' with its corresponding date for each row in Dataframe A. This is the result I expect :

IDIGOpenDateServiceDt_Service
P11110013/04/2022B14/04/2022
P22210116/04/2022B18/04/2022
P33310220/04/2022B20/04/2022

Tool : Spark 2.2 with PySpark since I am working on hadoop

Thank you for your help

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

暖心男生 2025-02-14 06:43:46

就像Samkart所说

from pyspark.sql import functions as F 
from pyspark.sql import Window
se="IG string,Service string,Dt_Service string"
de=[("100","A","2022-04-12"),("100","B","2022-04-13"),("100","B","2022-04-14"),("101","A","2022-04-15"),("101","A","2022-04-16"),("101","B","2022-04-17"),("101","B","2022-04-18"),("102","A","2022-04-19"),("102","B","2022-04-20")]

df1=spark.createDataFrame([("P111","100","13/04/2022"),("P222","101","16/04/2022"),("P333","102","20/04/2022")],"ID string,IG string, OpenDate string")
df2=fd.withColumn("rn",F.row_number().over(Window.partitionBy("ig").orderBy(F.to_date(F.col("Dt_service")).desc()))).filter("rn==1").drop("rn")
df1.join(df2,"IG","inner").show()

#output
+---+----+----------+-------+----------+
| IG|  ID|  OpenDate|Service|Dt_Service|
+---+----+----------+-------+----------+
|100|P111|13/04/2022|      B|2022-04-14|
|101|P222|16/04/2022|      B|2022-04-18|
|102|P333|20/04/2022|      B|2022-04-20|
+---+----+----------+-------+----------+

As samkart said we can do rank/row_number to get last service first then join to get your desired result

from pyspark.sql import functions as F 
from pyspark.sql import Window
se="IG string,Service string,Dt_Service string"
de=[("100","A","2022-04-12"),("100","B","2022-04-13"),("100","B","2022-04-14"),("101","A","2022-04-15"),("101","A","2022-04-16"),("101","B","2022-04-17"),("101","B","2022-04-18"),("102","A","2022-04-19"),("102","B","2022-04-20")]

df1=spark.createDataFrame([("P111","100","13/04/2022"),("P222","101","16/04/2022"),("P333","102","20/04/2022")],"ID string,IG string, OpenDate string")
df2=fd.withColumn("rn",F.row_number().over(Window.partitionBy("ig").orderBy(F.to_date(F.col("Dt_service")).desc()))).filter("rn==1").drop("rn")
df1.join(df2,"IG","inner").show()

#output
+---+----+----------+-------+----------+
| IG|  ID|  OpenDate|Service|Dt_Service|
+---+----+----------+-------+----------+
|100|P111|13/04/2022|      B|2022-04-14|
|101|P222|16/04/2022|      B|2022-04-18|
|102|P333|20/04/2022|      B|2022-04-20|
+---+----+----------+-------+----------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文