Flink Kafkasource阅读了该主题的所有消息

发布于 2025-01-31 20:51:15 字数 2447 浏览 3 评论 0原文

我的目标是使用Flink Kafkasource阅读Kafka主题的所有消息。我尝试使用批处理和流模式执行。问题是以下: 当我设置env.setParallelism高于2时,我必须使用一个包含错误的接收器。因此,我设置了:例如: streamExecutionEnvironment.setParallelism(1);

我想要消费的Kafka主题包含3个分区。 这是我拥有的代码片段:

KafkaSourceBuilder<Request> builder = KafkaSource.builder();
    builder.setBootstrapServers(kafkaBrokers);
    builder.setProperty("partition.discovery.interval.ms", "10000");
    builder.setTopics(topic);
    builder.setGroupId(groupId);
    builder.setBounded(OffsetsInitializer.latest());
    builder.setStartingOffsets(OffsetsInitializer.earliest());
    builder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializer));

DataStreamSource<Request> streamSource = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    streamSource.map(new MyMapper())
            .addSink(new Sink(props)).setParallelism(3) //by this setting I expected to have exactly 3 consumers - per partition/split, but it doesn't work when I do not set anything as well
            .name("Flink " + context.getJobDetail().getKey());

应该在Spring Boot应用程序中运行该代码,该代码将被扩展到该应用程序中,我配置了一个石英作业,该Quartz作业将定期执行,而StreamExecutionEnvironment是本地环境: streamExecutionEnvironment env = StreamExecutionEnvironment.CreateLocalenvironment();

此时,该主题中已经有消息,超过10m。 当作业执行时,我可以在日志中看到:

    [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=rrequest_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-2
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-0
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-1

然后,他们总共消耗了大约1M消息并停止消耗,并且在3个中,我可以看到:

[ -> Map (1/1)#0] o.a.f.c.b.s.reader.fetcher.SplitFetcher  : Finished reading from splits [request-1]

因此,他们没有完全消耗该主题,只有一部分。 重新触发了石英作业时,它再次开始从OffsetSinitializer.earliest()开始阅读,它们会消耗重复消息,而且还要读取新消息,不仅是新添加到主题中,而且在先前执行过程中没有消耗的消息。

我还尝试过重命名消费者群体,以消除消费者的问题,以防消费者在上一次消费后承诺。

我的问题是 - 如何配置数据流以充分阅读主题。我的问题与setParalalisl(1)设置或并行性有关,消费者组配置或其他任何其他问题? 请给我任何有关解决问题的建议。

My goal is reading all messages from Kafka topic using Flink KafkaSource. I tried to execute with batch and streaming modes. The problem is the following :
I have to use a Sink which contains bug when I set env.setParallelism higher than 2. Thus, I set for example:
streamExecutionEnvironment.setParallelism(1);

The Kafka topic that I want to consume contains 3 partitions.
This is a code snippet that I have:

KafkaSourceBuilder<Request> builder = KafkaSource.builder();
    builder.setBootstrapServers(kafkaBrokers);
    builder.setProperty("partition.discovery.interval.ms", "10000");
    builder.setTopics(topic);
    builder.setGroupId(groupId);
    builder.setBounded(OffsetsInitializer.latest());
    builder.setStartingOffsets(OffsetsInitializer.earliest());
    builder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializer));

DataStreamSource<Request> streamSource = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    streamSource.map(new MyMapper())
            .addSink(new Sink(props)).setParallelism(3) //by this setting I expected to have exactly 3 consumers - per partition/split, but it doesn't work when I do not set anything as well
            .name("Flink " + context.getJobDetail().getKey());

This code is supposed to run within Spring Boot application that will be dockerized, I configured a quartz job that periodically will be executed and streamExecutionEnvironment is local environment :
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

At this point there are already messages within the topic, more than 10M.
When the job is executed I can see in the log:

    [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=rrequest_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-2
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-0
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-1

Then they consume about 1M messages in total and stop consumption and for all of 3 I can see:

[ -> Map (1/1)#0] o.a.f.c.b.s.reader.fetcher.SplitFetcher  : Finished reading from splits [request-1]

Thus, they do not fully consume the topic, only part of it.
When quartz job is re-triggered, it again starts reading from OffsetsInitializer.earliest() , they consume duplicate messages but also new messages, not only newly added to the topic, but also some messages which weren't consumed during previous execution.

I have tried also renaming consumer groups in order to eliminate the problem with offsets in case if consumer committed after previous consumption.

My question is - how can I configure the data stream in order to fully read the topic. How my problem is related to setParallelism(1) setting or parallelism in general, consumer group configuration or anything else ?
Please give me any suggestion on troubleshooting the problem.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

︶葆Ⅱㄣ 2025-02-07 20:51:15

该问题与

builder.setBounded(OffsetsInitializer.latest());

此行有关,告诉Kafka将消息读取工作开始时的最后一个偏移。然后,它将停止消耗更多消息。

The problem is related to

builder.setBounded(OffsetsInitializer.latest());

This line tells Kafka to read messages to the last offset seen at the start of the job. It will then stop consuming more messages.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文