火花结构化流 - Pyspark和Scala之间的水印行为

发布于 2025-02-10 06:30:29 字数 3955 浏览 1 评论 0原文

我正在尝试使用Watermark在Python中部署一个结构化的流媒体应用程序。我使用控制台接收器来测试这一点,但发现了Pyspark和Scala之间的怪异行为。我将派生的列用作event time列在哪个窗口和水印所依赖的列。使用两个文件一个具有值2022-06-05 22:20:162022-06-05 23:35:16的文件。

标题:原始Timestamp,Balance

File1:

2022-06-05 22:20:20:16,5

文件2:

2022-06-05 23:35:16,7

逻辑:

streamingDF = spark.readStream.schema(
    customSchema).csv("csv_path").withColumn("ts", to_timestamp("originalTimestamp"))


interDF = streamingDF.withWatermark("ts", "1 minute").groupBy(
    window(streamingDF.ts, "2 minutes"))

outDF = interDF.agg(sum(col("balance")).alias("threshold")).filter(col("threshold") > 5)

pyspark(write):

query = (
    outDF.writeStream.format("console").outputMode("update").
    option("checkpointLocation", "checkpoint_folder").start())

query.awaitTermination()

scala(写):

 outDF.writeStream
    .format("console")
    .option("checkpointLocation", "checkpoint_folder")
    .outputMode("update")
    .start()
    .awaitTermination()

执行的步骤:

  1. 分别运行pyspark和scala。
  2. 将第一个文件放到位置两次(以满足过滤条件),然后在Scala和Python上看到输出
  3. 删除第二个文件(此时,这两个偏移位置上的水印均为> 16544468456000
  4. 再次删除了第一个文件,第一个文件,此时,水印为23:34:16,但传入的记录具有22:20:16,应忽略它。正确的?

但是,斯卡拉(Scala)完美地完成了这项工作 - 它没有拿起它!但是Pyspark Job在下一批中选择了这一点,并打印了结果。有点奇怪!

执行的故障排除步骤:

  1. 将Pyspark版本更新为3.3.0(最新),从3.2.1
  2. 验证了水印,这是最新批次中的最高值。

不知道为什么我看到这个行为,以前有人看过这种行为?任何帮助将不胜感激!

Pyspark的输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+
|              window|threshold|
+--------------------+---------+
|{2022-06-05 22:20...|     10.0|
+--------------------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+---------+
|              window|threshold|
+--------------------+---------+
|{2022-06-05 23:34...|      7.0|
+--------------------+---------+

-------------------------------------------
Batch: 3
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 4 `The batch that is not right!!`
-------------------------------------------
+--------------------+---------+
|              window|threshold|
+--------------------+---------+
|{2022-06-05 22:20...|     15.0|
+--------------------+---------+

Scala的输出

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+
|              window|threshold|
+--------------------+---------+
|[2022-06-05 23:34...|     7.0|
|[2022-06-05 22:20...|     10.0|
+--------------------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 3
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 4
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

I am trying to deploy a structured streaming app in python using watermark. I used console sink to test this, but found weird behaviour between pyspark and scala. I am using a derived column as event-time column on which window and watermark relies. Using two files one with value 2022-06-05 22:20:16 and other with 2022-06-05 23:35:16.

Header: originalTimestamp,balance

File1:

2022-06-05 22:20:16,5

File2:

2022-06-05 23:35:16,7

Logic:

streamingDF = spark.readStream.schema(
    customSchema).csv("csv_path").withColumn("ts", to_timestamp("originalTimestamp"))


interDF = streamingDF.withWatermark("ts", "1 minute").groupBy(
    window(streamingDF.ts, "2 minutes"))

outDF = interDF.agg(sum(col("balance")).alias("threshold")).filter(col("threshold") > 5)

Pyspark (write):

query = (
    outDF.writeStream.format("console").outputMode("update").
    option("checkpointLocation", "checkpoint_folder").start())

query.awaitTermination()

Scala (write):

 outDF.writeStream
    .format("console")
    .option("checkpointLocation", "checkpoint_folder")
    .outputMode("update")
    .start()
    .awaitTermination()

Steps performed:

  1. Run the pyspark and scala separately.
  2. Dropped the first file to location twice (to satisfy filter condition) and see output on both scala and python
  3. Dropped second file (By this time the watermarkMs on both the offsets location is 1654468456000)
  4. Again dropped first file, at this point the watermark is at 23:34:16 but the incoming record has 22:20:16 and it should be neglected. right?

However Scala does the job perfectly - it didn't picked it up! But Pyspark job picked this in next batch and printed the result. Its kind of weird !

Troubleshooting steps performed:

  1. Update pyspark version to 3.3.0 (latest) from 3.2.1
  2. validated the watermarkMs and it is the highest value in latest batch.

Not sure why i am seeing this, anyone has seen this behaviour before? any help would be appreciated!

Output of Pyspark:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+
|              window|threshold|
+--------------------+---------+
|{2022-06-05 22:20...|     10.0|
+--------------------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+---------+
|              window|threshold|
+--------------------+---------+
|{2022-06-05 23:34...|      7.0|
+--------------------+---------+

-------------------------------------------
Batch: 3
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 4 `The batch that is not right!!`
-------------------------------------------
+--------------------+---------+
|              window|threshold|
+--------------------+---------+
|{2022-06-05 22:20...|     15.0|
+--------------------+---------+

Output of Scala

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+
|              window|threshold|
+--------------------+---------+
|[2022-06-05 23:34...|     7.0|
|[2022-06-05 22:20...|     10.0|
+--------------------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 3
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

-------------------------------------------
Batch: 4
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文