火花结构化流 - Pyspark和Scala之间的水印行为
我正在尝试使用Watermark在Python中部署一个结构化的流媒体应用程序。我使用控制台接收器来测试这一点,但发现了Pyspark和Scala之间的怪异行为。我将派生的列用作event time
列在哪个窗口和水印所依赖的列。使用两个文件一个具有值2022-06-05 22:20:16
和2022-06-05 23:35:16
的文件。
标题:原始Timestamp,Balance
File1:
2022-06-05 22:20:20:16,5
文件2:
2022-06-05 23:35:16,7
逻辑:
streamingDF = spark.readStream.schema(
customSchema).csv("csv_path").withColumn("ts", to_timestamp("originalTimestamp"))
interDF = streamingDF.withWatermark("ts", "1 minute").groupBy(
window(streamingDF.ts, "2 minutes"))
outDF = interDF.agg(sum(col("balance")).alias("threshold")).filter(col("threshold") > 5)
pyspark(write):
query = (
outDF.writeStream.format("console").outputMode("update").
option("checkpointLocation", "checkpoint_folder").start())
query.awaitTermination()
scala(写):
outDF.writeStream
.format("console")
.option("checkpointLocation", "checkpoint_folder")
.outputMode("update")
.start()
.awaitTermination()
执行的步骤:
- 分别运行pyspark和scala。
- 将第一个文件放到位置两次(以满足过滤条件),然后在Scala和Python上看到输出
- 删除第二个文件(此时,这两个偏移位置上的水印均为
> 16544468456000
) - 再次删除了第一个文件,第一个文件,此时,水印为
23:34:16
,但传入的记录具有22:20:16
,应忽略它。正确的?
但是,斯卡拉(Scala)完美地完成了这项工作 - 它没有拿起它!但是Pyspark Job在下一批中选择了这一点,并打印了结果。有点奇怪!
执行的故障排除步骤:
- 将Pyspark版本更新为3.3.0(最新),从3.2.1
- 验证了水印,这是最新批次中的最高值。
不知道为什么我看到这个行为,以前有人看过这种行为?任何帮助将不胜感激!
Pyspark的输出:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+
| window|threshold|
+--------------------+---------+
|{2022-06-05 22:20...| 10.0|
+--------------------+---------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+---------+
| window|threshold|
+--------------------+---------+
|{2022-06-05 23:34...| 7.0|
+--------------------+---------+
-------------------------------------------
Batch: 3
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 4 `The batch that is not right!!`
-------------------------------------------
+--------------------+---------+
| window|threshold|
+--------------------+---------+
|{2022-06-05 22:20...| 15.0|
+--------------------+---------+
Scala的输出
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+
| window|threshold|
+--------------------+---------+
|[2022-06-05 23:34...| 7.0|
|[2022-06-05 22:20...| 10.0|
+--------------------+---------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 3
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 4
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
I am trying to deploy a structured streaming app in python using watermark. I used console sink to test this, but found weird behaviour between pyspark and scala. I am using a derived column as event-time
column on which window and watermark relies. Using two files one with value 2022-06-05 22:20:16
and other with 2022-06-05 23:35:16
.
Header: originalTimestamp,balance
File1:
2022-06-05 22:20:16,5
File2:
2022-06-05 23:35:16,7
Logic:
streamingDF = spark.readStream.schema(
customSchema).csv("csv_path").withColumn("ts", to_timestamp("originalTimestamp"))
interDF = streamingDF.withWatermark("ts", "1 minute").groupBy(
window(streamingDF.ts, "2 minutes"))
outDF = interDF.agg(sum(col("balance")).alias("threshold")).filter(col("threshold") > 5)
Pyspark (write):
query = (
outDF.writeStream.format("console").outputMode("update").
option("checkpointLocation", "checkpoint_folder").start())
query.awaitTermination()
Scala (write):
outDF.writeStream
.format("console")
.option("checkpointLocation", "checkpoint_folder")
.outputMode("update")
.start()
.awaitTermination()
Steps performed:
- Run the pyspark and scala separately.
- Dropped the first file to location twice (to satisfy filter condition) and see output on both scala and python
- Dropped second file (By this time the watermarkMs on both the offsets location is
1654468456000
) - Again dropped first file, at this point the watermark is at
23:34:16
but the incoming record has22:20:16
and it should be neglected. right?
However Scala does the job perfectly - it didn't picked it up! But Pyspark job picked this in next batch and printed the result. Its kind of weird !
Troubleshooting steps performed:
- Update pyspark version to 3.3.0 (latest) from 3.2.1
- validated the watermarkMs and it is the highest value in latest batch.
Not sure why i am seeing this, anyone has seen this behaviour before? any help would be appreciated!
Output of Pyspark:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+
| window|threshold|
+--------------------+---------+
|{2022-06-05 22:20...| 10.0|
+--------------------+---------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+---------+
| window|threshold|
+--------------------+---------+
|{2022-06-05 23:34...| 7.0|
+--------------------+---------+
-------------------------------------------
Batch: 3
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 4 `The batch that is not right!!`
-------------------------------------------
+--------------------+---------+
| window|threshold|
+--------------------+---------+
|{2022-06-05 22:20...| 15.0|
+--------------------+---------+
Output of Scala
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+
| window|threshold|
+--------------------+---------+
|[2022-06-05 23:34...| 7.0|
|[2022-06-05 22:20...| 10.0|
+--------------------+---------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 3
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
-------------------------------------------
Batch: 4
-------------------------------------------
+------+---------+
|window|threshold|
+------+---------+
+------+---------+
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论