Flink缺少窗户处理器(事件时间窗口)和Kafka源

发布于 2025-01-22 20:48:01 字数 2038 浏览 2 评论 0原文

我们有一个流媒体作业,具有20个单独的管道,每个管道都有一个/许多Kafka主题来源,并且一些管道带有窗户处理器,而其他管道则是未窗口的处理器。

当作业下降时,我们注意到窗户处理器管道的数据丢失,并且需要一些时间才能恢复/何时需要重新启动作业。

  • 我已经为所有操作员设置了UID,并且可以在日志中看到,从kafka消费者操作员的保存点还原偏移量

  • 我们使用boundedOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOutOffordersTimestAmpAmpExtractor来根据事件时间分配水印。

public class KafkaEventTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Event> implements Serializable{

    public KafkaEventTimestampExtractor(Time maxOutOfOrderness) {
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Event element) {
        try {
            log.info("event to be processed, event:{}", new ObjectMapper().writeValueAsString(element));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        Long ts = null;
        ts = Double.valueOf(Double.parseDouble(element.getTs())).longValue();
        ts = ts.toString().length() < 13 ? ts * 1000 : ts;
        return ts;
    }
}

管道配置看起来像这样。

  • 非窗户的
SourceUtil
  .getEventDataStream(env, kafkaSourceSet)
  .process(new S3EventProcessor()).uid(“…..**)
  .addSink();
  • 窗口
SourceUtil
  .getEventDataStream(env, kafkaSourceSet)
  .assignTimestampsAndWatermarks(
    new KafkaEventTimestampExtractor(Time.seconds(4)))
  .windowAll(TumblingEventTimeWindows.of(
    Time.milliseconds(kafkaSourceSet.bufferWindowSize))
  .process(new S3EventProcessor()).uid(“…..**)
  .addSink();
  • 说,工作是30分钟的下降,在这种情况下,我们不使用窗口处理器不会错过任何数据,但是在那30分钟内从窗户处理器中丢失了Paritial Data。

  • 当我们增加timeWinows的序列外事件延迟时,即我们将其从4sec增加到30分钟,如果应用程序在30分钟内,则不会错过这些事件。我们远不及解决方案。对于我们来说,超过1分钟的延迟也不可见,也会有太多的现场窗户对我们来说意味着巨大的下属变化。

We have a Streaming Job that has 20 separate pipelines, with each pipeline having one/many Kafka topic sources and with some pipelines having Windowed Processor and others being a Non-Windowed Processor.

We are noticing data loss for Windowed Processor pipelines when the job goes down and takes some time to recover/when the job needs to be restarted.

  • I have set UID for all of the Operators and I can see in logs that offsets are being restored from savepoint for the Kafka consumer operator

  • we are using BoundedOutOfOrdernessTimestampExtractor to Assign watermarks based on event time.

public class KafkaEventTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Event> implements Serializable{

    public KafkaEventTimestampExtractor(Time maxOutOfOrderness) {
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Event element) {
        try {
            log.info("event to be processed, event:{}", new ObjectMapper().writeValueAsString(element));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        Long ts = null;
        ts = Double.valueOf(Double.parseDouble(element.getTs())).longValue();
        ts = ts.toString().length() < 13 ? ts * 1000 : ts;
        return ts;
    }
}

Pipeline Config looks something like this.

  • NON-WINDOWED
SourceUtil
  .getEventDataStream(env, kafkaSourceSet)
  .process(new S3EventProcessor()).uid(“…..**)
  .addSink();
  • WINDOWED
SourceUtil
  .getEventDataStream(env, kafkaSourceSet)
  .assignTimestampsAndWatermarks(
    new KafkaEventTimestampExtractor(Time.seconds(4)))
  .windowAll(TumblingEventTimeWindows.of(
    Time.milliseconds(kafkaSourceSet.bufferWindowSize))
  .process(new S3EventProcessor()).uid(“…..**)
  .addSink();
  • Lets say job is down 30 min, in that case pipeline where we do not use window processor does not miss any data but paritial data is missed from the windowed processor for those 30 min.

  • when we increase the out-of-order events delay in TimeWinows, ie- we increased It to 30min from 4sec, then the events are not getting missed if the application is up within 30min.we are getting nowhere near the solution since the delay of more than 1 min is infeasible for us also there will be too many live windows which will mean huge infra change for us.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

倾城泪 2025-01-29 20:48:01

我可以想象的唯一情况可能会解释这一点是事件时间戳是否受到停机的影响。然后,30分钟的停电将在时间戳中导致30分钟的间隙,并且随着外部摄入,4秒的订购策略将产生一些后期事件,这些事件将在窗户上删除。

The only scenario I can imagine that might explain this is if the event timestamps are affected by the outage. Then a 30-minute outage would cause a 30-minute gap in the timestamps, and with out-or-order ingestion, a 4-second bounded-out-of-orderness strategy will yield some late events that will be dropped by the window.

黑凤梨 2025-01-29 20:48:01

这是由于我的管道中的错误而发生的,而不是使用FlinkkafkaconSumer的时间戳分配程序,而是将其添加到FlinkkafkaConsumer生成的数据流中。

这一更改已解决了我的问题,以自动恢复,但是如果手动重新启动管道的任何更改,则在作业停止时,最后一个窗口仍缺少某些数据。

注意: - 我们正在使用检查点进行手动恢复。
根据文档,检查点是工作失败时自动恢复的理想选择。

如果我们需要创建一个保存点,以防我们需要对管道进行一些更改并手动重新启动它,或者我们可以通过检查点进行完全恢复,那么对此有所帮助。

我们唯一关心的是使用SavePoint,是对可能发生的相同事件的重新处理,这对我们而言并不理想。

This was happening due to a mistake in my pipeline, instead of using the timestamp Assigner at flinkKafkaConsumer, it was added to the data stream generated from flinkKafkaConsumer.

This change has fixed the issue at my end for automatic recovery but in case of a manual restart post any changes to the pipeline, some data is still being missed for the last window when the job had stopped.

Note:-- we are using checkpoint for manual recovery.
As per docs, Checkpoints are ideal for automatic recovery in case of job failures.

Any note on this would help, if we need to create a savepoint in case we need to make some changes to the pipeline and restart it manually or we can make a complete recovery with the checkpoint.

Our Only concern in case of using savepoint is the reprocessing of same events that might happen, which is not ideal for us in few cases.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文