SPARK/SCALA-加入结果集给出类型类型的不匹配错误,同时使用Column操作执行;找到:org.apache.spark.sql.column必需:布尔值
我有两个数据框,我要加入,并基于连接的集合,试图分配标志列。
dunestdf1
+--------+-----------+---------+
|rgn_nm |file_crt_dt|file_vrsn|
+--------+-----------+---------+
|DAO |2022-06-30 |1 |
|DAO |2022-06-30 |1 |
|CCC |2022-06-30 |1 |
|APCC |2022-06-30 |1 |
|ODM |2022-06-29 |3 |
|EMF |2022-06-30 |1 |
|T2Region|2022-06-29 |4 |
|BCC |2022-06-30 |1 |
|EMF |2022-07-01 |1 |
+--------+-----------+---------+
outputdistinctdf
+------+-----------+---------+
|region|file_crt_dt|file_vrsn|
+------+-----------+---------+
|DAO |2022-06-30 |1 |
|CCC |2022-06-29 |1 |
|APCC |2022-06-30 |1 |
|ODM |2022-06-29 |2 |
|EMF |2022-06-30 |1 |
|BCC |2022-06-30 |1 |
+------+-----------+---------+
我试图在下面实现类似的事情。
+------------+-----------------+---------------+-------------+------------------+----------------+----+
|input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
+------------+-----------------+---------------+-------------+------------------+----------------+----+
|DAO |2022-06-30 |1 |DAO |2022-06-30 |1 |0 |
|CCC |2022-06-30 |1 |CCC |2022-06-29 |1 |1 |
|T2Region |2022-06-29 |4 |null |null |null |1 |
|ODM |2022-06-29 |3 |ODM |2022-06-29 |2 |1 |
|APCC |2022-06-30 |1 |APCC |2022-06-30 |1 |0 |
|EMF |2022-07-01 |1 |EMF |2022-06-30 |1 |1 |
|EMF |2022-06-30 |1 |EMF |2022-06-30 |1 |0 |
|BCC |2022-06-30 |1 |BCC |2022-06-30 |1 |0 |
+------------+-----------------+---------------+-------------+------------------+----------------+----+
逻辑:(
input_file_crt_dt> output_file_crt_dt)或
(input_file_crt_dt = output_file_crt_dt和input_file_vrsn> output_file_vrsn)或
(output_region为null)
然后flag = 1 else 0
我遵循的步骤:
val demandDF1 = Seq(("DAO","2022-06-30","1"),
("DAO","2022-06-30","1"),
("CCC","2022-06-30","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","3"),
("EMF","2022-06-30","1"),
("T2Region","2022-06-29","4"),
("BCC","2022-06-30","1"),
("EMF","2022-07-01","1")).toDF("rgn_nm","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))
val outputDistinctDF = Seq(("DAO","2022-06-30","1"),
("CCC","2022-06-29","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","2"),
("EMF","2022-06-30","1"),
("BCC","2022-06-30","1")).toDF("region","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))
val inputDistinctDF = demandDF1.select(col("rgn_nm"), col("file_crt_dt"), col("file_vrsn")).distinct()
val resultantDF = inputDistinctDF.join(outputDistinctDF,
inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region")
, "left_outer").select(inputDistinctDF.col("rgn_nm") as "input_region",
inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
outputDistinctDF.col("region") as "output_region",
outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
outputDistinctDF.col("file_vrsn") as "output_file_vrsn"
).withColumn("flag",
when((
col("output_region").isNull || col("input_file_crt_dt").gt(col("output_file_crt_dt")) || ( col("input_file_crt_dt").eq(col("output_file_crt_dt")) && col("input_file_vrsn").gt(col("output_file_vrsn")) )
), lit("1")).otherwise(lit("0")))
I have two dataframes that I am trying to join and based on the joined set trying to assign a flag column.
demandDF1
+--------+-----------+---------+
|rgn_nm |file_crt_dt|file_vrsn|
+--------+-----------+---------+
|DAO |2022-06-30 |1 |
|DAO |2022-06-30 |1 |
|CCC |2022-06-30 |1 |
|APCC |2022-06-30 |1 |
|ODM |2022-06-29 |3 |
|EMF |2022-06-30 |1 |
|T2Region|2022-06-29 |4 |
|BCC |2022-06-30 |1 |
|EMF |2022-07-01 |1 |
+--------+-----------+---------+
outputDistinctDF
+------+-----------+---------+
|region|file_crt_dt|file_vrsn|
+------+-----------+---------+
|DAO |2022-06-30 |1 |
|CCC |2022-06-29 |1 |
|APCC |2022-06-30 |1 |
|ODM |2022-06-29 |2 |
|EMF |2022-06-30 |1 |
|BCC |2022-06-30 |1 |
+------+-----------+---------+
I am trying to achieve something like this below.
+------------+-----------------+---------------+-------------+------------------+----------------+----+
|input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
+------------+-----------------+---------------+-------------+------------------+----------------+----+
|DAO |2022-06-30 |1 |DAO |2022-06-30 |1 |0 |
|CCC |2022-06-30 |1 |CCC |2022-06-29 |1 |1 |
|T2Region |2022-06-29 |4 |null |null |null |1 |
|ODM |2022-06-29 |3 |ODM |2022-06-29 |2 |1 |
|APCC |2022-06-30 |1 |APCC |2022-06-30 |1 |0 |
|EMF |2022-07-01 |1 |EMF |2022-06-30 |1 |1 |
|EMF |2022-06-30 |1 |EMF |2022-06-30 |1 |0 |
|BCC |2022-06-30 |1 |BCC |2022-06-30 |1 |0 |
+------------+-----------------+---------------+-------------+------------------+----------------+----+
The logic:
(input_file_crt_dt > output_file_crt_dt ) or
(input_file_crt_dt = output_file_crt_dt and input_file_vrsn > output_file_vrsn) or
(output_region is null)
then flag = 1 else 0
I have tried the following pseudo code but it ends up giving the error:
Steps that I have followed:
val demandDF1 = Seq(("DAO","2022-06-30","1"),
("DAO","2022-06-30","1"),
("CCC","2022-06-30","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","3"),
("EMF","2022-06-30","1"),
("T2Region","2022-06-29","4"),
("BCC","2022-06-30","1"),
("EMF","2022-07-01","1")).toDF("rgn_nm","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))
val outputDistinctDF = Seq(("DAO","2022-06-30","1"),
("CCC","2022-06-29","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","2"),
("EMF","2022-06-30","1"),
("BCC","2022-06-30","1")).toDF("region","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))
val inputDistinctDF = demandDF1.select(col("rgn_nm"), col("file_crt_dt"), col("file_vrsn")).distinct()
val resultantDF = inputDistinctDF.join(outputDistinctDF,
inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region")
, "left_outer").select(inputDistinctDF.col("rgn_nm") as "input_region",
inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
outputDistinctDF.col("region") as "output_region",
outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
outputDistinctDF.col("file_vrsn") as "output_file_vrsn"
).withColumn("flag",
when((
col("output_region").isNull || col("input_file_crt_dt").gt(col("output_file_crt_dt")) || ( col("input_file_crt_dt").eq(col("output_file_crt_dt")) && col("input_file_vrsn").gt(col("output_file_vrsn")) )
), lit("1")).otherwise(lit("0")))
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我仔细阅读了您的条件,并为其创建了一个单独的变量,以防出现问题。但是一切进展顺利。
I've carefully read your conditions and created a separate variable for it in case something went wrong. But everything went well.