Flink检查点没有重播在SavePoint/Checkpoint期间正在进行的KAFKA事件

发布于 2025-01-28 01:44:54 字数 2664 浏览 3 评论 0原文

我想在Flink中精确测试端到端。我的工作是:

kafka -source-> mapper1-> mapper -2 - > kafka-sink

我在mapper1中放了一个thread.sleep(100000),然后运行工作。我在停止作业的同时拿了保存点,然后删除了thread.sleep(100000)形式的mapper1,我希望由于没有下沉而应该重播该事件。但这没有发生,工作正在等待新活动。

我的kafka来源:

KafkaSource.<String>builder()
                .setBootstrapServers(consumerConfig.getBrokers())
                .setTopics(consumerConfig.getTopic())
                .setGroupId(consumerConfig.getGroupId())
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("commit.offsets.on.checkpoint", "true")
                .build();

我的kafka sink:

KafkaSink.<String>builder()
                .setBootstrapServers(producerConfig.getBootstrapServers())
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(producerConfig.getTopic())
                        .setValueSerializationSchema(new SimpleStringSchema()).build())
                .build();

我的flink工作环境集:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.enableCheckpointing(2000);
        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
        environment.getCheckpointConfig().setCheckpointTimeout(60000);
        environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        environment.getCheckpointConfig().setCheckpointTimeout(1000);
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        environment.getCheckpointConfig().enableUnalignedCheckpoints();
        environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        environment.configure(configuration);

我在这里做错了什么? 我希望在取消/停止期间正在进行的任何事件都应再次重新启动。

编辑1: 我观察到我的kafka显示了弗林克的kafka-source消费者群体的偏移滞后。我假设这意味着我的检查点的行为正确,是正确的吗?

我还观察到当我从检查点重新启动工作时,它没有从剩余的偏移中开始消耗,而消费者偏移设置最早。我不得不发送更多事件以触发卡夫卡源方面的消费,然后消耗了所有事件。

I want to test end-to-end exactly once processing in flink. My job is:

Kafka-source -> mapper1 -> mapper-2 -> kafka-sink

I had put a Thread.sleep(100000) in mapper1 and then ran the job. I took the savepoint while stopping the job and then I removed the Thread.sleep(100000) form the mapper1, and I expect that the event should be replayed as it was not sinked. But that didnt happen and job is waiting for new event.

My Kafka source:

KafkaSource.<String>builder()
                .setBootstrapServers(consumerConfig.getBrokers())
                .setTopics(consumerConfig.getTopic())
                .setGroupId(consumerConfig.getGroupId())
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("commit.offsets.on.checkpoint", "true")
                .build();

My kafka sink:

KafkaSink.<String>builder()
                .setBootstrapServers(producerConfig.getBootstrapServers())
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(producerConfig.getTopic())
                        .setValueSerializationSchema(new SimpleStringSchema()).build())
                .build();

My environmentSetup for flink job:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.enableCheckpointing(2000);
        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
        environment.getCheckpointConfig().setCheckpointTimeout(60000);
        environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        environment.getCheckpointConfig().setCheckpointTimeout(1000);
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        environment.getCheckpointConfig().enableUnalignedCheckpoints();
        environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        environment.configure(configuration);

What am I doing wrong here?
I want that any event which is in process during the cancellation/stop of the job, should restart again.

EDIT 1:
I observed that my kafka was showing offset lag for my flink's kafka-source consumer group. I am assuming it means my checkpointing is behaving right, is that correct ?

I also observed when i restarted my job from checkpoint, it didnt start to consume from the remaining offsets, while I have the consumer offset set to EARLIEST. I had to send more events to trigger the consumption on kafka-source side and then it consumed all the events.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

浅忆流年 2025-02-04 01:44:54

,您必须提供TransactionalIdPrefix在所有针对同一Kafka群集运行的应用程序中唯一的唯一(这是与legacy flinkkafkafkaconsumer相比,这是一个更改):

KafkaSink<T> sink =
        KafkaSink.<T>builder()
                .setBootstrapServers(...)
                .setKafkaProducerConfig(...)
                .setRecordSerializer(...)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("unique-id-for-your-app")
                .build();

对于确切的一致性 检查点,Flink始终使用存储在检查点中的偏移,而不是在代码中配置或存储在经纪人中的偏移量。

For exactly-once, you must provide a TransactionalIdPrefix unique across all applications running against the same Kafka cluster (this is a change compared to the legacy FlinkKafkaConsumer):

KafkaSink<T> sink =
        KafkaSink.<T>builder()
                .setBootstrapServers(...)
                .setKafkaProducerConfig(...)
                .setRecordSerializer(...)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("unique-id-for-your-app")
                .build();

When resuming from a checkpoint, Flink always uses the offsets stored in the checkpoint rather than those configured in the code or stored in the broker.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文