如何在kafka消费者容器中模拟setBatchErrorHandler()错误?
我有一个使用 ConcurrentKafkaListenerContainerFactory 的 kafka 消费者应用程序,我读到 setBatchErrorHandler 用于在批量监听期间出现错误时处理错误。以下是我用于配置工厂和接收功能的代码。我可以知道如何模拟错误以使用BatchErrorHandler吗?
@Bean(name = "xxxconsumer")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(14);
factory.setBatchListener(true);
factory.setBatchErrorHandler(new BatchLoggingErrorHandler());
return factory;
}
@KafkaListener(
topics = "filler.name.1",
containerFactory = "xxxconsumer"
)
public void receive(@Payload List<String> messages) {
for (int i = 0; i < messages.size(); i++) {
log.info("Received message='{}' ", messages.get(i));
transform(messages.get(i));
}
log.info("All batch messages received");
}
我正在使用 spring-kafka v2.3.7。
I've a kafka consumer application that uses ConcurrentKafkaListenerContainerFactory and I read that setBatchErrorHandler is used to handle errors when there is an error during batch listening. Below is my codes for configuring the factory and receiving function. May I know how can I simulate the error to use BatchErrorHandler?
@Bean(name = "xxxconsumer")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(14);
factory.setBatchListener(true);
factory.setBatchErrorHandler(new BatchLoggingErrorHandler());
return factory;
}
@KafkaListener(
topics = "filler.name.1",
containerFactory = "xxxconsumer"
)
public void receive(@Payload List<String> messages) {
for (int i = 0; i < messages.size(); i++) {
log.info("Received message='{}' ", messages.get(i));
transform(messages.get(i));
}
log.info("All batch messages received");
}
I'm using spring-kafka v2.3.7.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以在侦听器中引发
RuntimeException
来模拟错误,并查看BatchLoggingErrorHandler
启动。考虑此应用程序:
将生成以下日志:
You can throw a
RuntimeException
in the listener to simulate an error and see theBatchLoggingErrorHandler
kicking in.Consider this application:
Will produce this logs: