为什么我的水印不在我的Apache Flink Keyed Stream中前进?
我目前正在将Apache Flink 1.13.2与Java一起用于流媒体应用程序。我正在使用没有窗口功能的键功能。尽管我的水印没有前进,但我已经实施了水印策略和autoWatermark Interval
配置。
我通过使用Flink Web UI并在我的EventProcessor
keyedProcessFunction
中打印当前水印进行了仔细检查,但是水印会不断设置为很大的负数号码-9223372036854775808
(最低水印)。
env.getConfig().setAutoWatermarkInterval(1000);
WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
.<EventPayload>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream<EventPayload> deserialized = input
.assignTimestampsAndWatermarks(watermarkStrategy)
.flatMap(new Deserializer());
DataStream<EnrichedEventPayload> resultStream =
AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.process(new EventProcessor());
我什至尝试将WaterMarkStrategy
添加到使用keyby
的流中(并调整以匹配的类型),但仍然没有运气。
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.assignTimestampsAndWatermarks(watermarkStrategy)
.process(new EventProcessor());
我还尝试使用自己的类实施WaterMarkStrategy
并在OneVent
函数上设置断点,以确保发布新的水印,尽管它仍然没有前进(并且任何相关的关联计时器没有开火)。
任何帮助将不胜感激!
I am currently using Apache Flink 1.13.2 with Java for my streaming application. I am using a keyed function with no window function. I have implemented a watermark strategy and autoWatermarkInterval
config per the documentation, although my watermark is not advancing.
I have double-checked this by using the Flink web UI and printing the current watermark in my EventProcessor
KeyedProcessFunction
but the watermark is constantly set to a very large negative number -9223372036854775808
(lowest possible watermark).
env.getConfig().setAutoWatermarkInterval(1000);
WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
.<EventPayload>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream<EventPayload> deserialized = input
.assignTimestampsAndWatermarks(watermarkStrategy)
.flatMap(new Deserializer());
DataStream<EnrichedEventPayload> resultStream =
AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.process(new EventProcessor());
I even tried to add the WatermarkStrategy
to the stream where it is using keyBy
(and adjusting the types to match) but still no luck.
DataStream<Session> eventsStream = resultStream
.filter(EnrichedEventPayload::getIsEnriched)
.keyBy(EnrichedEventPayload::getId)
.assignTimestampsAndWatermarks(watermarkStrategy)
.process(new EventProcessor());
I have also tried using my own class implementing WatermarkStrategy
and set breakpoints on the onEvent
function to ensure the new watermark was being emitted, although it still did not advance (and any associated timers did not fire).
Any help would be greatly appreciated!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
如果水印策略的平行实例之一是空闲的(即,如果没有事件流过它),则会发生这种情况。在水印策略上使用
withIdleness(...)
选项将是解决此问题的一种方法。This will happen if one of the parallel instances of the watermark strategy is idle (i.e., if there are no events flowing through it). Using the
withIdleness(...)
option on the watermark strategy would be one way to solve this.