Flink Kafkasource阅读了该主题的所有消息
我的目标是使用Flink Kafkasource阅读Kafka主题的所有消息。我尝试使用批处理和流模式执行。问题是以下: 当我设置env.setParallelism高于2时,我必须使用一个包含错误的接收器。因此,我设置了:例如: streamExecutionEnvironment.setParallelism(1);
我想要消费的Kafka主题包含3个分区。 这是我拥有的代码片段:
KafkaSourceBuilder<Request> builder = KafkaSource.builder();
builder.setBootstrapServers(kafkaBrokers);
builder.setProperty("partition.discovery.interval.ms", "10000");
builder.setTopics(topic);
builder.setGroupId(groupId);
builder.setBounded(OffsetsInitializer.latest());
builder.setStartingOffsets(OffsetsInitializer.earliest());
builder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializer));
DataStreamSource<Request> streamSource = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
streamSource.map(new MyMapper())
.addSink(new Sink(props)).setParallelism(3) //by this setting I expected to have exactly 3 consumers - per partition/split, but it doesn't work when I do not set anything as well
.name("Flink " + context.getJobDetail().getKey());
应该在Spring Boot应用程序中运行该代码,该代码将被扩展到该应用程序中,我配置了一个石英作业,该Quartz作业将定期执行,而StreamExecutionEnvironment是本地环境: streamExecutionEnvironment env = StreamExecutionEnvironment.CreateLocalenvironment();
此时,该主题中已经有消息,超过10m。 当作业执行时,我可以在日志中看到:
[ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=rrequest_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-2
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-0
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-1
然后,他们总共消耗了大约1M消息并停止消耗,并且在3个中,我可以看到:
[ -> Map (1/1)#0] o.a.f.c.b.s.reader.fetcher.SplitFetcher : Finished reading from splits [request-1]
因此,他们没有完全消耗该主题,只有一部分。 重新触发了石英作业时,它再次开始从OffsetSinitializer.earliest()开始阅读,它们会消耗重复消息,而且还要读取新消息,不仅是新添加到主题中,而且在先前执行过程中没有消耗的消息。
我还尝试过重命名消费者群体,以消除消费者的问题,以防消费者在上一次消费后承诺。
我的问题是 - 如何配置数据流以充分阅读主题。我的问题与setParalalisl(1)设置或并行性有关,消费者组配置或其他任何其他问题? 请给我任何有关解决问题的建议。
My goal is reading all messages from Kafka topic using Flink KafkaSource. I tried to execute with batch and streaming modes. The problem is the following :
I have to use a Sink which contains bug when I set env.setParallelism higher than 2. Thus, I set for example:streamExecutionEnvironment.setParallelism(1);
The Kafka topic that I want to consume contains 3 partitions.
This is a code snippet that I have:
KafkaSourceBuilder<Request> builder = KafkaSource.builder();
builder.setBootstrapServers(kafkaBrokers);
builder.setProperty("partition.discovery.interval.ms", "10000");
builder.setTopics(topic);
builder.setGroupId(groupId);
builder.setBounded(OffsetsInitializer.latest());
builder.setStartingOffsets(OffsetsInitializer.earliest());
builder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializer));
DataStreamSource<Request> streamSource = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
streamSource.map(new MyMapper())
.addSink(new Sink(props)).setParallelism(3) //by this setting I expected to have exactly 3 consumers - per partition/split, but it doesn't work when I do not set anything as well
.name("Flink " + context.getJobDetail().getKey());
This code is supposed to run within Spring Boot application that will be dockerized, I configured a quartz job that periodically will be executed and streamExecutionEnvironment is local environment :StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
At this point there are already messages within the topic, more than 10M.
When the job is executed I can see in the log:
[ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=rrequest_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-2
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-0
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-1
Then they consume about 1M messages in total and stop consumption and for all of 3 I can see:
[ -> Map (1/1)#0] o.a.f.c.b.s.reader.fetcher.SplitFetcher : Finished reading from splits [request-1]
Thus, they do not fully consume the topic, only part of it.
When quartz job is re-triggered, it again starts reading from OffsetsInitializer.earliest() , they consume duplicate messages but also new messages, not only newly added to the topic, but also some messages which weren't consumed during previous execution.
I have tried also renaming consumer groups in order to eliminate the problem with offsets in case if consumer committed after previous consumption.
My question is - how can I configure the data stream in order to fully read the topic. How my problem is related to setParallelism(1) setting or parallelism in general, consumer group configuration or anything else ?
Please give me any suggestion on troubleshooting the problem.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
该问题与
此行有关,告诉Kafka将消息读取工作开始时的最后一个偏移。然后,它将停止消耗更多消息。
The problem is related to
This line tells Kafka to read messages to the last offset seen at the start of the job. It will then stop consuming more messages.