如何在kafka消费者容器中模拟setBatchErrorHandler()错误?

发布于 2025-01-10 04:40:33 字数 1118 浏览 0 评论 0原文

我有一个使用 ConcurrentKafkaListenerContainerFactory 的 kafka 消费者应用程序,我读到 setBatchErrorHandler 用于在批量监听期间出现错误时处理错误。以下是我用于配置工厂和接收功能的代码。我可以知道如何模拟错误以使用BatchErrorHandler吗?

@Bean(name = "xxxconsumer")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
    final ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(14);
    factory.setBatchListener(true);
    factory.setBatchErrorHandler(new BatchLoggingErrorHandler());
    return factory;
}

 @KafkaListener(
        topics = "filler.name.1",
        containerFactory = "xxxconsumer"
)
public void receive(@Payload List<String> messages) {
    for (int i = 0; i < messages.size(); i++) {
        log.info("Received message='{}' ", messages.get(i));

        transform(messages.get(i));
    }

    log.info("All batch messages received");
}

我正在使用 spring-kafka v2.3.7。

I've a kafka consumer application that uses ConcurrentKafkaListenerContainerFactory and I read that setBatchErrorHandler is used to handle errors when there is an error during batch listening. Below is my codes for configuring the factory and receiving function. May I know how can I simulate the error to use BatchErrorHandler?

@Bean(name = "xxxconsumer")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
    final ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(14);
    factory.setBatchListener(true);
    factory.setBatchErrorHandler(new BatchLoggingErrorHandler());
    return factory;
}

 @KafkaListener(
        topics = "filler.name.1",
        containerFactory = "xxxconsumer"
)
public void receive(@Payload List<String> messages) {
    for (int i = 0; i < messages.size(); i++) {
        log.info("Received message='{}' ", messages.get(i));

        transform(messages.get(i));
    }

    log.info("All batch messages received");
}

I'm using spring-kafka v2.3.7.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

仙气飘飘 2025-01-17 04:40:33

您可以在侦听器中引发 RuntimeException 来模拟错误,并查看 BatchLoggingErrorHandler 启动。

考虑此应用程序:

@SpringBootApplication
public class SO71276422 {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(SO71276422.class, args);

        try {
            Thread.sleep(10000);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new RuntimeException("Interrupted");
        }
        KafkaOperations kafkaTemplate = context.getBean("kafkaTemplate", KafkaOperations.class);
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("filler.name.1", "My message " + i);
        }
    }

    Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(
            topics = "filler.name.1",
            containerFactory = "xxxconsumer"
    )
    public void receive(@Payload List<String> messages) {
        log.info("Received messages: " + messages);

        for (int i = 0; i < messages.size(); i++) {
            log.info("Received message='{}' ", messages.get(i));
            transform(messages.get(i));
        }
        throw new RuntimeException("Test exception");
    }

    private void transform(String s) {
        log.info("Transforming message " + s);
    }

    @Configuration
    static class MyConfiguration {

        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "grp_STRING");
            return new DefaultKafkaConsumerFactory<>(config);

            //inject consumer factory to kafka listener consumer factory
        }

        @Bean(name = "xxxconsumer")
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
            final ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            factory.setConcurrency(14);
            factory.setBatchListener(true);
            factory.setBatchErrorHandler(new BatchLoggingErrorHandler());
            return factory;
        }
    }

}

将生成以下日志:

2022-02-28 15:22:54.795  INFO 19058 --- [ntainer#0-0-C-1] 1276422$EnhancerBySpringCGLIB$7449d5be : Received messages: [My message 0, My message 1, My message 2, My message 3, My message 4, My message 5, My message 6, My message 7, My message 8, My message 9]

...

2022-02-28 15:22:54.796  INFO 19058 --- [ntainer#0-0-C-1] 1276422$EnhancerBySpringCGLIB$7449d5be : Transforming message My message 9
2022-02-28 15:22:54.799 ERROR 19058 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:
filler.name.1-0@40
filler.name.1-0@41
filler.name.1-0@42
filler.name.1-0@43
filler.name.1-0@44
filler.name.1-0@45
filler.name.1-0@46
filler.name.1-0@47
filler.name.1-0@48
filler.name.1-0@49

You can throw a RuntimeException in the listener to simulate an error and see the BatchLoggingErrorHandler kicking in.

Consider this application:

@SpringBootApplication
public class SO71276422 {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(SO71276422.class, args);

        try {
            Thread.sleep(10000);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new RuntimeException("Interrupted");
        }
        KafkaOperations kafkaTemplate = context.getBean("kafkaTemplate", KafkaOperations.class);
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("filler.name.1", "My message " + i);
        }
    }

    Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(
            topics = "filler.name.1",
            containerFactory = "xxxconsumer"
    )
    public void receive(@Payload List<String> messages) {
        log.info("Received messages: " + messages);

        for (int i = 0; i < messages.size(); i++) {
            log.info("Received message='{}' ", messages.get(i));
            transform(messages.get(i));
        }
        throw new RuntimeException("Test exception");
    }

    private void transform(String s) {
        log.info("Transforming message " + s);
    }

    @Configuration
    static class MyConfiguration {

        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "grp_STRING");
            return new DefaultKafkaConsumerFactory<>(config);

            //inject consumer factory to kafka listener consumer factory
        }

        @Bean(name = "xxxconsumer")
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
            final ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            factory.setConcurrency(14);
            factory.setBatchListener(true);
            factory.setBatchErrorHandler(new BatchLoggingErrorHandler());
            return factory;
        }
    }

}

Will produce this logs:

2022-02-28 15:22:54.795  INFO 19058 --- [ntainer#0-0-C-1] 1276422$EnhancerBySpringCGLIB$7449d5be : Received messages: [My message 0, My message 1, My message 2, My message 3, My message 4, My message 5, My message 6, My message 7, My message 8, My message 9]

...

2022-02-28 15:22:54.796  INFO 19058 --- [ntainer#0-0-C-1] 1276422$EnhancerBySpringCGLIB$7449d5be : Transforming message My message 9
2022-02-28 15:22:54.799 ERROR 19058 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:
filler.name.1-0@40
filler.name.1-0@41
filler.name.1-0@42
filler.name.1-0@43
filler.name.1-0@44
filler.name.1-0@45
filler.name.1-0@46
filler.name.1-0@47
filler.name.1-0@48
filler.name.1-0@49
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文