Spring Boot Kafka抑制例外
我有一个kafka的听众,如果实现引发异常,它将重试几次。当抛出异常时,将其记录为错误。这污染了我们的哨兵日志。我想将Kafka的消费者错误级别更改为警告级别。
详细信息:
我有一个Kafka的听众,必须重述一些活动。
@KafkaListener(...)
@RetryableTopic(..)
public void consume(ConsumerRecord<?,?> event) {
service.sayHello(...); //this call can throw UnhappyException
}
在不满意的情况下,它将记录下来,因为
> Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException:
> Listener method 'public void
> com.company.hello.world.listener.MyListener.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.Object,
> ....)' threw
> exception; nested exception is
> com.hello.world.somepackage.exceptions.UnhappyException:
> Animal id 4444 is Unhappy
我们的应用程序将尝试再次处理消息(如果发生例外),并且在某些时候,该消息将降落在死信主题中,或者处理。不幸的是,由于我们当前的体系结构,我们的服务中不存在动物ID,因此可能会抛出几次例外,直到进行处理。
该消息被记录为“错误”,并且在Sentry警报平台中创建“ false”警报。
有没有办法将错误不满意的exception更改为警告级别,而无需更改哨兵配置?
我试图查看concurrentkafkalistenerconercontainerfactory,看看我是否可以通过自定义错误处理程序传递,但它根本不会达到断点。
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
/*
// Original implementation
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));
*/
// here I created CustomErrorHandler and tried to extend DefaultErrorHandler and override methods but they do not get hit by debugger at all.
factory.setCommonErrorHandler(new CustomErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));
I have a kafka listener where if implementation throws exceptions, it will retry a few times. When exception is thrown, it is logged as error. This pollutes our Sentry log. I would like to change Kafka's consumers error level to warn level.
Details:
I have a Kafka listener and some events has to be retried.
@KafkaListener(...)
@RetryableTopic(..)
public void consume(ConsumerRecord<?,?> event) {
service.sayHello(...); //this call can throw UnhappyException
}
In the case UnHappyException is thrown, it is logged as
> Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException:
> Listener method 'public void
> com.company.hello.world.listener.MyListener.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.Object,
> ....)' threw
> exception; nested exception is
> com.hello.world.somepackage.exceptions.UnhappyException:
> Animal id 4444 is Unhappy
Our application will try to process message again (if exception occurs), and at some point the message will either land in a dead letter topic, or be processed. Unfortunately, due to our current architecture, it is common for the animal id to be not present in our service, thus it could throw exceptions a few times until it gets processed.
The message gets logged as "ERROR" and it is creating "false" alerts in the Sentry alert platform.
Is there a way to change the error UnHappyException to warn level, without having to change the sentry configuration?
I tried to look at ConcurrentKafkaListenerContainerFactory and see if I could pass in custom error handler, but it does not hit break point at all.
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
/*
// Original implementation
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));
*/
// here I created CustomErrorHandler and tried to extend DefaultErrorHandler and override methods but they do not get hit by debugger at all.
factory.setCommonErrorHandler(new CustomErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
事实证明,可重试的主题不会将错误记录为例外,直到登录死信主题,因此不需要采取任何措施。
It turns out retryable topic will not log errors as exception until it hits dead letter topic, so no action was needed.