春季集成kafka侦听器线程读取多个分区时,当并发=分区计数

发布于 2025-02-01 13:48:36 字数 1690 浏览 2 评论 0原文

我设置了一个弹簧集成流,以处理一个具有3个分区的主题,并将侦听器容器的并发设置为3。但是,我看到在某些情况下,其中一个侦听器线程可能会处理包含来自多个分区的消息的单个批次。我的数据通过ID在KAFKA中分区,以便可以与其他ID同时处理,但在另一个线程上不使用相同的ID(这是我惊讶的观察到的情况)。我从阅读文档中想到,每个线程都会被分配一个分区。我正在使用这样的kafkamessageDrivenChannelAdapter:

private static final Class<List<MyEvent>> payloadClass = (Class<List<MyEvent>>)(Class) List.class;

public KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<String, MyEvent> myChannelAdapterSpec() {
        return Kafka.messageDrivenChannelAdapter(tstatEventConsumerFactory(),
                KafkaMessageDrivenChannelAdapter.ListenerMode.batch, "my-topic") //3 partitions
                .configureListenerContainer(c -> {
                    c.ackMode(ContainerProperties.AckMode.BATCH);
                    c.id(_ID);
                    c.concurrency(3);
                    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
                            (record, exception) -> log.error("failed to handle record at offset {}: {}",
                                    record.offset(), record.value(), exception),
                            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
                    );
                    c.errorHandler(errorHandler);
                });
    }
@Bean
public IntegrationFlow myIntegrationFlow() {
        return IntegrationFlows.from(myChannelAdapterSpec())
                .handle(payloadClass, (payload, headers) -> {
                    service.performSink(payload);
                    return null;
                })
                .get();
    }

如何设置此操作,以便每个侦听器容器线程仅处理一个分区中的消息?

I set up a Spring Integration flow to process a topic having 3 partitions and set the listener container's concurrency to 3. As expected, I see three threads processing batches from all 3 partitions. However, I see that in some cases, one of the listener threads may process a single batch containing messages from multiple partitions. My data is partitioned in kafka by an id so that it may be processed concurrently with other ids, but not with the same ids on another thread (which is what I was surprised to observe is happening). I thought from reading the docs that each thread would be assigned a partition. I'm using a KafkaMessageDrivenChannelAdapter like this:

private static final Class<List<MyEvent>> payloadClass = (Class<List<MyEvent>>)(Class) List.class;

public KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<String, MyEvent> myChannelAdapterSpec() {
        return Kafka.messageDrivenChannelAdapter(tstatEventConsumerFactory(),
                KafkaMessageDrivenChannelAdapter.ListenerMode.batch, "my-topic") //3 partitions
                .configureListenerContainer(c -> {
                    c.ackMode(ContainerProperties.AckMode.BATCH);
                    c.id(_ID);
                    c.concurrency(3);
                    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
                            (record, exception) -> log.error("failed to handle record at offset {}: {}",
                                    record.offset(), record.value(), exception),
                            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
                    );
                    c.errorHandler(errorHandler);
                });
    }
@Bean
public IntegrationFlow myIntegrationFlow() {
        return IntegrationFlows.from(myChannelAdapterSpec())
                .handle(payloadClass, (payload, headers) -> {
                    service.performSink(payload);
                    return null;
                })
                .get();
    }

How do I set this up so that each listener container thread only processes messages from one partition?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

很酷又爱笑 2025-02-08 13:48:36

但是,即使确实发生了重新平衡,我是否还可以避免使用多个分区的消息?

那不是消费者群体的运作方式。如果您想拥有“粘性”消费者,请考虑使用手动分配。请参阅基于topic -partitionOffset的频道适配器工厂...主题分类

/**
 * Create an initial
 * {@link KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec}.
 * @param consumerFactory the {@link ConsumerFactory}.
 * @param listenerMode the {@link KafkaMessageDrivenChannelAdapter.ListenerMode}.
 * @param topicPartitions the {@link TopicPartitionOffset} vararg.
 * @param <K> the Kafka message key type.
 * @param <V> the Kafka message value type.
 * @return the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
 */
public static <K, V>
KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(
        ConsumerFactory<K, V> consumerFactory,
        KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode,
        TopicPartitionOffset... topicPartitions) {

然后,它不会被视为消费者组,您必须创建多个指向每个频道适配器,指向每个频道适配其特定分区。所有此通道适配器都可以向相同的MessageChannel发出消息。

But is there additionally a way that I can keep from ever getting a batch with messages from multiple partitions, even if a rebalance does occur?

That's not how consumer group works. If you would like to have a "sticky" consumers, then consider to use a manual assignment. See the channel adapter factory based on the TopicPartitionOffset... topicPartitions:

/**
 * Create an initial
 * {@link KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec}.
 * @param consumerFactory the {@link ConsumerFactory}.
 * @param listenerMode the {@link KafkaMessageDrivenChannelAdapter.ListenerMode}.
 * @param topicPartitions the {@link TopicPartitionOffset} vararg.
 * @param <K> the Kafka message key type.
 * @param <V> the Kafka message value type.
 * @return the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
 */
public static <K, V>
KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(
        ConsumerFactory<K, V> consumerFactory,
        KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode,
        TopicPartitionOffset... topicPartitions) {

Then it is not going to be treated as consumer group and you have to create several channel adapters pointing each to its specific partition. All of this channel adapters may emit messages to the same MessageChannel.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文