春季集成kafka侦听器线程读取多个分区时,当并发=分区计数
我设置了一个弹簧集成流,以处理一个具有3个分区的主题,并将侦听器容器的并发设置为3。但是,我看到在某些情况下,其中一个侦听器线程可能会处理包含来自多个分区的消息的单个批次。我的数据通过ID在KAFKA中分区,以便可以与其他ID同时处理,但在另一个线程上不使用相同的ID(这是我惊讶的观察到的情况)。我从阅读文档中想到,每个线程都会被分配一个分区。我正在使用这样的kafkamessageDrivenChannelAdapter:
private static final Class<List<MyEvent>> payloadClass = (Class<List<MyEvent>>)(Class) List.class;
public KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<String, MyEvent> myChannelAdapterSpec() {
return Kafka.messageDrivenChannelAdapter(tstatEventConsumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.batch, "my-topic") //3 partitions
.configureListenerContainer(c -> {
c.ackMode(ContainerProperties.AckMode.BATCH);
c.id(_ID);
c.concurrency(3);
RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
(record, exception) -> log.error("failed to handle record at offset {}: {}",
record.offset(), record.value(), exception),
new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
);
c.errorHandler(errorHandler);
});
}
@Bean
public IntegrationFlow myIntegrationFlow() {
return IntegrationFlows.from(myChannelAdapterSpec())
.handle(payloadClass, (payload, headers) -> {
service.performSink(payload);
return null;
})
.get();
}
如何设置此操作,以便每个侦听器容器线程仅处理一个分区中的消息?
I set up a Spring Integration flow to process a topic having 3 partitions and set the listener container's concurrency to 3. As expected, I see three threads processing batches from all 3 partitions. However, I see that in some cases, one of the listener threads may process a single batch containing messages from multiple partitions. My data is partitioned in kafka by an id so that it may be processed concurrently with other ids, but not with the same ids on another thread (which is what I was surprised to observe is happening). I thought from reading the docs that each thread would be assigned a partition. I'm using a KafkaMessageDrivenChannelAdapter like this:
private static final Class<List<MyEvent>> payloadClass = (Class<List<MyEvent>>)(Class) List.class;
public KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<String, MyEvent> myChannelAdapterSpec() {
return Kafka.messageDrivenChannelAdapter(tstatEventConsumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.batch, "my-topic") //3 partitions
.configureListenerContainer(c -> {
c.ackMode(ContainerProperties.AckMode.BATCH);
c.id(_ID);
c.concurrency(3);
RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
(record, exception) -> log.error("failed to handle record at offset {}: {}",
record.offset(), record.value(), exception),
new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
);
c.errorHandler(errorHandler);
});
}
@Bean
public IntegrationFlow myIntegrationFlow() {
return IntegrationFlows.from(myChannelAdapterSpec())
.handle(payloadClass, (payload, headers) -> {
service.performSink(payload);
return null;
})
.get();
}
How do I set this up so that each listener container thread only processes messages from one partition?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
那不是消费者群体的运作方式。如果您想拥有“粘性”消费者,请考虑使用手动分配。请参阅基于
topic -partitionOffset的频道适配器工厂...主题分类
:然后,它不会被视为消费者组,您必须创建多个指向每个频道适配器,指向每个频道适配其特定分区。所有此通道适配器都可以向相同的
MessageChannel
发出消息。That's not how consumer group works. If you would like to have a "sticky" consumers, then consider to use a manual assignment. See the channel adapter factory based on the
TopicPartitionOffset... topicPartitions
:Then it is not going to be treated as consumer group and you have to create several channel adapters pointing each to its specific partition. All of this channel adapters may emit messages to the same
MessageChannel
.