SPARK/SCALA-加入结果集给出类型类型的不匹配错误,同时使用Column操作执行;找到:org.apache.spark.sql.column必需:布尔值

发布于 2025-02-12 03:42:31 字数 4369 浏览 2 评论 0原文

我有两个数据框,我要加入,并基于连接的集合,试图分配标志列。

dunestdf1

+--------+-----------+---------+
|rgn_nm  |file_crt_dt|file_vrsn|
+--------+-----------+---------+
|DAO     |2022-06-30 |1        |
|DAO     |2022-06-30 |1        |
|CCC     |2022-06-30 |1        |
|APCC    |2022-06-30 |1        |
|ODM     |2022-06-29 |3        |
|EMF     |2022-06-30 |1        |
|T2Region|2022-06-29 |4        |
|BCC     |2022-06-30 |1        |
|EMF     |2022-07-01 |1        |
+--------+-----------+---------+

outputdistinctdf

+------+-----------+---------+
|region|file_crt_dt|file_vrsn|
+------+-----------+---------+
|DAO   |2022-06-30 |1        |
|CCC   |2022-06-29 |1        |
|APCC  |2022-06-30 |1        |
|ODM   |2022-06-29 |2        |
|EMF   |2022-06-30 |1        |
|BCC   |2022-06-30 |1        |
+------+-----------+---------+

我试图在下面实现类似的事情。

+------------+-----------------+---------------+-------------+------------------+----------------+----+
|input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
+------------+-----------------+---------------+-------------+------------------+----------------+----+
|DAO         |2022-06-30       |1              |DAO          |2022-06-30        |1               |0   |
|CCC         |2022-06-30       |1              |CCC          |2022-06-29        |1               |1   |
|T2Region    |2022-06-29       |4              |null         |null              |null            |1   |
|ODM         |2022-06-29       |3              |ODM          |2022-06-29        |2               |1   |
|APCC        |2022-06-30       |1              |APCC         |2022-06-30        |1               |0   |
|EMF         |2022-07-01       |1              |EMF          |2022-06-30        |1               |1   |
|EMF         |2022-06-30       |1              |EMF          |2022-06-30        |1               |0   |
|BCC         |2022-06-30       |1              |BCC          |2022-06-30        |1               |0   |
+------------+-----------------+---------------+-------------+------------------+----------------+----+

逻辑:(

input_file_crt_dt> output_file_crt_dt)或
(input_file_crt_dt = output_file_crt_dt和input_file_vrsn> output_file_vrsn)或
(output_region为null)

然后flag = 1 else 0

我尝试了以下伪代码,但最终给出了错误:

我遵循的步骤:

val demandDF1 = Seq(("DAO","2022-06-30","1"),
("DAO","2022-06-30","1"),
("CCC","2022-06-30","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","3"),
("EMF","2022-06-30","1"),
("T2Region","2022-06-29","4"),
("BCC","2022-06-30","1"),
("EMF","2022-07-01","1")).toDF("rgn_nm","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))

val outputDistinctDF = Seq(("DAO","2022-06-30","1"),
("CCC","2022-06-29","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","2"),
("EMF","2022-06-30","1"),
("BCC","2022-06-30","1")).toDF("region","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))

val inputDistinctDF = demandDF1.select(col("rgn_nm"), col("file_crt_dt"), col("file_vrsn")).distinct()

val resultantDF = inputDistinctDF.join(outputDistinctDF,
  inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region")
  , "left_outer").select(inputDistinctDF.col("rgn_nm") as "input_region",
  inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
  inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
  outputDistinctDF.col("region") as "output_region",
  outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
  outputDistinctDF.col("file_vrsn") as "output_file_vrsn"
).withColumn("flag",
  when((
    col("output_region").isNull || col("input_file_crt_dt").gt(col("output_file_crt_dt")) || ( col("input_file_crt_dt").eq(col("output_file_crt_dt")) && col("input_file_vrsn").gt(col("output_file_vrsn")) )
    ), lit("1")).otherwise(lit("0")))

I have two dataframes that I am trying to join and based on the joined set trying to assign a flag column.

demandDF1

+--------+-----------+---------+
|rgn_nm  |file_crt_dt|file_vrsn|
+--------+-----------+---------+
|DAO     |2022-06-30 |1        |
|DAO     |2022-06-30 |1        |
|CCC     |2022-06-30 |1        |
|APCC    |2022-06-30 |1        |
|ODM     |2022-06-29 |3        |
|EMF     |2022-06-30 |1        |
|T2Region|2022-06-29 |4        |
|BCC     |2022-06-30 |1        |
|EMF     |2022-07-01 |1        |
+--------+-----------+---------+

outputDistinctDF

+------+-----------+---------+
|region|file_crt_dt|file_vrsn|
+------+-----------+---------+
|DAO   |2022-06-30 |1        |
|CCC   |2022-06-29 |1        |
|APCC  |2022-06-30 |1        |
|ODM   |2022-06-29 |2        |
|EMF   |2022-06-30 |1        |
|BCC   |2022-06-30 |1        |
+------+-----------+---------+

I am trying to achieve something like this below.

+------------+-----------------+---------------+-------------+------------------+----------------+----+
|input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
+------------+-----------------+---------------+-------------+------------------+----------------+----+
|DAO         |2022-06-30       |1              |DAO          |2022-06-30        |1               |0   |
|CCC         |2022-06-30       |1              |CCC          |2022-06-29        |1               |1   |
|T2Region    |2022-06-29       |4              |null         |null              |null            |1   |
|ODM         |2022-06-29       |3              |ODM          |2022-06-29        |2               |1   |
|APCC        |2022-06-30       |1              |APCC         |2022-06-30        |1               |0   |
|EMF         |2022-07-01       |1              |EMF          |2022-06-30        |1               |1   |
|EMF         |2022-06-30       |1              |EMF          |2022-06-30        |1               |0   |
|BCC         |2022-06-30       |1              |BCC          |2022-06-30        |1               |0   |
+------------+-----------------+---------------+-------------+------------------+----------------+----+

The logic:

(input_file_crt_dt > output_file_crt_dt ) or
(input_file_crt_dt = output_file_crt_dt and input_file_vrsn > output_file_vrsn) or
(output_region is null)

then flag = 1 else 0

I have tried the following pseudo code but it ends up giving the error:
enter image description here

Steps that I have followed:

val demandDF1 = Seq(("DAO","2022-06-30","1"),
("DAO","2022-06-30","1"),
("CCC","2022-06-30","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","3"),
("EMF","2022-06-30","1"),
("T2Region","2022-06-29","4"),
("BCC","2022-06-30","1"),
("EMF","2022-07-01","1")).toDF("rgn_nm","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))

val outputDistinctDF = Seq(("DAO","2022-06-30","1"),
("CCC","2022-06-29","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","2"),
("EMF","2022-06-30","1"),
("BCC","2022-06-30","1")).toDF("region","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))

val inputDistinctDF = demandDF1.select(col("rgn_nm"), col("file_crt_dt"), col("file_vrsn")).distinct()

val resultantDF = inputDistinctDF.join(outputDistinctDF,
  inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region")
  , "left_outer").select(inputDistinctDF.col("rgn_nm") as "input_region",
  inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
  inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
  outputDistinctDF.col("region") as "output_region",
  outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
  outputDistinctDF.col("file_vrsn") as "output_file_vrsn"
).withColumn("flag",
  when((
    col("output_region").isNull || col("input_file_crt_dt").gt(col("output_file_crt_dt")) || ( col("input_file_crt_dt").eq(col("output_file_crt_dt")) && col("input_file_vrsn").gt(col("output_file_vrsn")) )
    ), lit("1")).otherwise(lit("0")))

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

逆流 2025-02-19 03:42:31

我仔细阅读了您的条件,并为其创建了一个单独的变量,以防出现问题。但是一切进展顺利。

val cond = (
  ($"input_file_crt_dt" > $"output_file_crt_dt") ||
  (($"input_file_crt_dt" === $"output_file_crt_dt") &&
   ($"input_file_vrsn" > $"output_file_vrsn")) ||
  $"output_region".isNull
)

val resultantDF = inputDistinctDF
  .join(
    outputDistinctDF,
    inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region"),
    "left_outer")
  .select(
    inputDistinctDF.col("rgn_nm") as "input_region",
    inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
    inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
    outputDistinctDF.col("region") as "output_region",
    outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
    outputDistinctDF.col("file_vrsn") as "output_file_vrsn")
  .withColumn("flag", when(cond, "1").otherwise("0"))
resultantDF.show()
// +------------+-----------------+---------------+-------------+------------------+----------------+----+
// |input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
// +------------+-----------------+---------------+-------------+------------------+----------------+----+
// |         BCC|       2022-06-30|              1|          BCC|        2022-06-30|               1|   0|
// |        APCC|       2022-06-30|              1|         APCC|        2022-06-30|               1|   0|
// |    T2Region|       2022-06-29|              4|         null|              null|            null|   1|
// |         EMF|       2022-06-30|              1|          EMF|        2022-06-30|               1|   0|
// |         ODM|       2022-06-29|              3|          ODM|        2022-06-29|               2|   1|
// |         CCC|       2022-06-30|              1|          CCC|        2022-06-29|               1|   1|
// |         EMF|       2022-07-01|              1|          EMF|        2022-06-30|               1|   1|
// |         DAO|       2022-06-30|              1|          DAO|        2022-06-30|               1|   0|
// +------------+-----------------+---------------+-------------+------------------+----------------+----+

I've carefully read your conditions and created a separate variable for it in case something went wrong. But everything went well.

val cond = (
  (
quot;input_file_crt_dt" > 
quot;output_file_crt_dt") ||
  ((
quot;input_file_crt_dt" === 
quot;output_file_crt_dt") &&
   (
quot;input_file_vrsn" > 
quot;output_file_vrsn")) ||
  
quot;output_region".isNull
)

val resultantDF = inputDistinctDF
  .join(
    outputDistinctDF,
    inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region"),
    "left_outer")
  .select(
    inputDistinctDF.col("rgn_nm") as "input_region",
    inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
    inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
    outputDistinctDF.col("region") as "output_region",
    outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
    outputDistinctDF.col("file_vrsn") as "output_file_vrsn")
  .withColumn("flag", when(cond, "1").otherwise("0"))
resultantDF.show()
// +------------+-----------------+---------------+-------------+------------------+----------------+----+
// |input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
// +------------+-----------------+---------------+-------------+------------------+----------------+----+
// |         BCC|       2022-06-30|              1|          BCC|        2022-06-30|               1|   0|
// |        APCC|       2022-06-30|              1|         APCC|        2022-06-30|               1|   0|
// |    T2Region|       2022-06-29|              4|         null|              null|            null|   1|
// |         EMF|       2022-06-30|              1|          EMF|        2022-06-30|               1|   0|
// |         ODM|       2022-06-29|              3|          ODM|        2022-06-29|               2|   1|
// |         CCC|       2022-06-30|              1|          CCC|        2022-06-29|               1|   1|
// |         EMF|       2022-07-01|              1|          EMF|        2022-06-30|               1|   1|
// |         DAO|       2022-06-30|              1|          DAO|        2022-06-30|               1|   0|
// +------------+-----------------+---------------+-------------+------------------+----------------+----+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文