flink 是否保存带有水印的关闭事件时间窗口的历史记录?
我有一个 flink 作业,它使用带有事件时间和水印的键控翻滚窗口来聚合数据。
我的问题是 flink 是否保留他已经关闭的窗口的状态? 否则,我没有其他解释为什么属于以前从未打开过的窗口的事件会打开一个窗口并且不会立即删除它。
假设我们的窗口为 1 小时,forBoundedOutOfOrderness 为 10 分钟,
让我们举个例子:
event1 = ("2022-01-01T08:25:00Z") =>;窗口触发
event2 = ("2022-01-01T09:25:00Z") =>窗口已创建,但未按预期触发
event3 = ("2022-01-01T05:25:00Z") =>将与事件 4 聚合而不是丢弃(为什么?)
event4 = ("2022-01-01T05:40:00Z") =>将与事件 3 聚合而不是删除(为什么?)
val stream = env
.fromSource(
kafkaSource,
WatermarkStrategy
.forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
.withIdleness(Duration.ofSeconds(idleness))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
: Long = {
logger.info(
LogMessage(
element._3.orgId,
s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
element._3.flowId
)
)
element._3.updateTime.asEpoch
}
}),
s"Source - $kConsumeTopic"
)
stream
.keyBy(element => (element._2.orgId -> element._2.procUid))
.window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
.reduce(new ReduceFunc)
.name("Aggregated EnrichedProcess")
.sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
.name(s"Sink -> $kProduceTopic")
编辑: 我测试这个的方法是使用 docker compose 进行集成测试。我正在向 Kafka => 生成一个事件Flink job 消耗的沉入Kafka =>检查kafka的内容。
当我在发送事件之间设置 30 秒的睡眠时,事件 3 和事件 4 被丢弃。这是我所期待的行为。
val producer = new Producer(producerTopic)
val consumer = new Consumer(consumerTopic, groupId)
producer.send(event1)
producer.send(event2)
Thread.sleep(30000)
producer.send(event3)
Thread.sleep(30000)
producer.send(event4)
val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()
但现在更好奇的是,为什么当我设置 10 秒而不是 30 秒的睡眠时,我只收到第一种情况(水印应该已经更新(定期水印生成器的默认值为 200 毫秒)
I have flink job that aggregates data using keyed tumbling windows with event time and watermark.
My question is does flink holds a state of his already closed windows?
Otherwise, I have no other explanation why an event that belongs to a window that never opened before will open a window and not dropped it immediately.
given that our windows are for 1 hour and forBoundedOutOfOrderness is 10 minutes
Lets have an example :
event1 = ("2022-01-01T08:25:00Z") => window fired
event2 = ("2022-01-01T09:25:00Z") => window created but not fired as expected
event3 = ("2022-01-01T05:25:00Z") => will be aggregate with event 4 instead of dropped (why?)
event4 = ("2022-01-01T05:40:00Z") => will be aggregate with event 3 instead of dropped (why?)
val stream = env
.fromSource(
kafkaSource,
WatermarkStrategy
.forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
.withIdleness(Duration.ofSeconds(idleness))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
: Long = {
logger.info(
LogMessage(
element._3.orgId,
s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
element._3.flowId
)
)
element._3.updateTime.asEpoch
}
}),
s"Source - $kConsumeTopic"
)
stream
.keyBy(element => (element._2.orgId -> element._2.procUid))
.window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
.reduce(new ReduceFunc)
.name("Aggregated EnrichedProcess")
.sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
.name(s"Sink -> $kProduceTopic")
edited:
The way I'm testing this out is Integration Tests with docker compose. I'm generating an event to Kafka => consumed by Flink job & sink to Kafka => checking the content of kafka.
When I put Sleep of 30 sec between sending the event3 and event4 are dropped. This is the behaviour I was expecting.
val producer = new Producer(producerTopic)
val consumer = new Consumer(consumerTopic, groupId)
producer.send(event1)
producer.send(event2)
Thread.sleep(30000)
producer.send(event3)
Thread.sleep(30000)
producer.send(event4)
val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()
But even more curious now is why when I put Sleep of 10 sec instead of 30, I recieve only the first situation (The watermark was supposed to be updated already(defualt of periodic watermark generator is 200ms)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
执行摘要:
Flink 基于事件时间的逻辑中的非确定性来自于处理时间与事件时间的混合——就像周期性水印生成器和空闲检测所发生的那样。只有当您永远不会有迟到的事件或闲置的资源时,您才能确定确定性的结果。
更多详细信息:
虽然您预计
会迟到,但只有在足够大的水印成功先到达的情况下,才会真正迟到。使用
forBoundedOutOfOrderness
策略并不能保证这一点——这是一个周期性水印生成器,每200毫秒生成一次水印。因此,可能在 event3 和 event4 之间产生基于 event2 时间戳的水印。这是一种可能的解释;根据具体情况,可能还有其他情况。例如,随着所有睡眠的进行,水印生成器的一个并行实例至少空闲一分钟,这可能涉及产生被观察到的行为(取决于idleness 等)。
更多背景:
并行度> 1、水印策略有多个独立的实例,每个实例根据它们处理的事件做自己的事情。
具有多个输入通道(例如键控窗口)的操作员将通过将传入水印(来自所有非空闲通道)的最小值作为自己的水印来组合这些水印。
回答原来的问题:
Flink 是否保留已经关闭的窗口的状态? 不会。一旦允许的延迟(如果有)过期,事件的状态时间窗口被清除。
Executive summary:
Non-determinism in event-time-based logic with Flink comes from mixing processing time with event time -- as happens with periodic watermark generators and idleness detection. Only if you never have late events or idle sources can you be sure of deterministic results.
More details:
While you would expect
to be late, it will only truly be late if a large enough watermark has managed to arrive first. With the
forBoundedOutOfOrderness
strategy there's no guarantee of that -- this is a periodic watermark generator that produces watermarks every 200 msec. So it could be that a watermark based on the timestamp of event2 is produced between event3 and event4.That's one possible explanation; there may be others depending on the exact circumstances. For example, with all that sleeping going on, one of the parallelism instances of the watermark generator is idle for at least a minute, which may be involved in producing the behavior being observed (depending on the value of idleness, etc).
More background:
With the parallelism being > 1, there are multiple, independent instances of the watermark strategy each doing their own thing based on the events they process.
Operators with multiple input channels, such as the keyed window, will combine these watermarks by taking the minimum of the incoming watermarks (from all non-idle channels) as their own watermark.
Answering the original question:
Does Flink retain the state for windows that have already been closed? No. Once the allowed lateness (if any) has expired, the state for an event time window is purged.