Flink检查点没有重播在SavePoint/Checkpoint期间正在进行的KAFKA事件
我想在Flink中精确测试端到端。我的工作是:
kafka -source-> mapper1-> mapper -2 - > kafka-sink
我在mapper1中放了一个thread.sleep(100000)
,然后运行工作。我在停止作业的同时拿了保存点,然后删除了thread.sleep(100000)
形式的mapper1,我希望由于没有下沉而应该重播该事件。但这没有发生,工作正在等待新活动。
我的kafka来源:
KafkaSource.<String>builder()
.setBootstrapServers(consumerConfig.getBrokers())
.setTopics(consumerConfig.getTopic())
.setGroupId(consumerConfig.getGroupId())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("commit.offsets.on.checkpoint", "true")
.build();
我的kafka sink:
KafkaSink.<String>builder()
.setBootstrapServers(producerConfig.getBootstrapServers())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(producerConfig.getTopic())
.setValueSerializationSchema(new SimpleStringSchema()).build())
.build();
我的flink工作环境集:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(2000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
environment.getCheckpointConfig().setCheckpointTimeout(60000);
environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getCheckpointConfig().setCheckpointTimeout(1000);
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
environment.getCheckpointConfig().enableUnalignedCheckpoints();
environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
Configuration configuration = new Configuration();
configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
environment.configure(configuration);
我在这里做错了什么? 我希望在取消/停止期间正在进行的任何事件都应再次重新启动。
编辑1: 我观察到我的kafka显示了弗林克的kafka-source消费者群体的偏移滞后。我假设这意味着我的检查点的行为正确,是正确的吗?
我还观察到当我从检查点重新启动工作时,它没有从剩余的偏移中开始消耗,而消费者偏移设置最早。我不得不发送更多事件以触发卡夫卡源方面的消费,然后消耗了所有事件。
I want to test end-to-end exactly once processing in flink. My job is:
Kafka-source -> mapper1 -> mapper-2 -> kafka-sink
I had put a Thread.sleep(100000)
in mapper1 and then ran the job. I took the savepoint while stopping the job and then I removed the Thread.sleep(100000)
form the mapper1, and I expect that the event should be replayed as it was not sinked. But that didnt happen and job is waiting for new event.
My Kafka source:
KafkaSource.<String>builder()
.setBootstrapServers(consumerConfig.getBrokers())
.setTopics(consumerConfig.getTopic())
.setGroupId(consumerConfig.getGroupId())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("commit.offsets.on.checkpoint", "true")
.build();
My kafka sink:
KafkaSink.<String>builder()
.setBootstrapServers(producerConfig.getBootstrapServers())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(producerConfig.getTopic())
.setValueSerializationSchema(new SimpleStringSchema()).build())
.build();
My environmentSetup for flink job:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(2000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
environment.getCheckpointConfig().setCheckpointTimeout(60000);
environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getCheckpointConfig().setCheckpointTimeout(1000);
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
environment.getCheckpointConfig().enableUnalignedCheckpoints();
environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
Configuration configuration = new Configuration();
configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
environment.configure(configuration);
What am I doing wrong here?
I want that any event which is in process during the cancellation/stop of the job, should restart again.
EDIT 1:
I observed that my kafka was showing offset lag for my flink's kafka-source consumer group. I am assuming it means my checkpointing is behaving right, is that correct ?
I also observed when i restarted my job from checkpoint, it didnt start to consume from the remaining offsets, while I have the consumer offset set to EARLIEST. I had to send more events to trigger the consumption on kafka-source side and then it consumed all the events.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
,您必须提供
TransactionalIdPrefix
在所有针对同一Kafka群集运行的应用程序中唯一的唯一(这是与legacyflinkkafkafkaconsumer
相比,这是一个更改):对于确切的一致性 检查点,Flink始终使用存储在检查点中的偏移,而不是在代码中配置或存储在经纪人中的偏移量。
For exactly-once, you must provide a
TransactionalIdPrefix
unique across all applications running against the same Kafka cluster (this is a change compared to the legacyFlinkKafkaConsumer
):When resuming from a checkpoint, Flink always uses the offsets stored in the checkpoint rather than those configured in the code or stored in the broker.