为什么我的水印不在我的Apache Flink Keyed Stream中前进?

发布于 2025-01-31 06:40:47 字数 1579 浏览 3 评论 0原文

我目前正在将Apache Flink 1.13.2与Java一起用于流媒体应用程序。我正在使用没有窗口功能的键功能。尽管我的水印没有前进,但我已经实施了水印策略和autoWatermark Interval配置。

我通过使用Flink Web UI并在我的EventProcessor keyedProcessFunction中打印当前水印进行了仔细检查,但是水印会不断设置为很大的负数号码-9223372036854775808(最低水印)。

env.getConfig().setAutoWatermarkInterval(1000);

WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
        .<EventPayload>forMonotonousTimestamps()
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

DataStream<EventPayload> deserialized = input
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .flatMap(new Deserializer());

DataStream<EnrichedEventPayload> resultStream =
        AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);

DataStream<Session> eventsStream = resultStream
        .filter(EnrichedEventPayload::getIsEnriched)
        .keyBy(EnrichedEventPayload::getId)
        .process(new EventProcessor());

我什至尝试将WaterMarkStrategy添加到使用keyby的流中(并调整以匹配的类型),但仍然没有运气。

DataStream<Session> eventsStream = resultStream
        .filter(EnrichedEventPayload::getIsEnriched)
        .keyBy(EnrichedEventPayload::getId)
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .process(new EventProcessor());

我还尝试使用自己的类实施WaterMarkStrategy并在OneVent函数上设置断点,以确保发布新的水印,尽管它仍然没有前进(并且任何相关的关联计时器没有开火)。

任何帮助将不胜感激!

I am currently using Apache Flink 1.13.2 with Java for my streaming application. I am using a keyed function with no window function. I have implemented a watermark strategy and autoWatermarkInterval config per the documentation, although my watermark is not advancing.

I have double-checked this by using the Flink web UI and printing the current watermark in my EventProcessor KeyedProcessFunction but the watermark is constantly set to a very large negative number -9223372036854775808 (lowest possible watermark).

env.getConfig().setAutoWatermarkInterval(1000);

WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
        .<EventPayload>forMonotonousTimestamps()
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

DataStream<EventPayload> deserialized = input
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .flatMap(new Deserializer());

DataStream<EnrichedEventPayload> resultStream =
        AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);

DataStream<Session> eventsStream = resultStream
        .filter(EnrichedEventPayload::getIsEnriched)
        .keyBy(EnrichedEventPayload::getId)
        .process(new EventProcessor());

I even tried to add the WatermarkStrategy to the stream where it is using keyBy (and adjusting the types to match) but still no luck.

DataStream<Session> eventsStream = resultStream
        .filter(EnrichedEventPayload::getIsEnriched)
        .keyBy(EnrichedEventPayload::getId)
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .process(new EventProcessor());

I have also tried using my own class implementing WatermarkStrategy and set breakpoints on the onEvent function to ensure the new watermark was being emitted, although it still did not advance (and any associated timers did not fire).

Any help would be greatly appreciated!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

故人爱我别走 2025-02-07 06:40:47

如果水印策略的平行实例之一是空闲的(即,如果没有事件流过它),则会发生这种情况。在水印策略上使用withIdleness(...)选项将是解决此问题的一种方法。

This will happen if one of the parallel instances of the watermark strategy is idle (i.e., if there are no events flowing through it). Using the withIdleness(...) option on the watermark strategy would be one way to solve this.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文