Spring Boot Kafka抑制例外

发布于 2025-02-13 20:56:45 字数 1979 浏览 1 评论 0原文

我有一个kafka的听众,如果实现引发异常,它将重试几次。当抛出异常时,将其记录为错误。这污染了我们的哨兵日志。我想将Kafka的消费者错误级别更改为警告级别。

详细信息:

我有一个Kafka的听众,必须重述一些活动。

@KafkaListener(...)
@RetryableTopic(..)
public void consume(ConsumerRecord<?,?> event) {
   service.sayHello(...); //this call can throw UnhappyException
}

在不满意的情况下,它将记录下来,因为

> Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException:
> Listener method 'public void
> com.company.hello.world.listener.MyListener.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.Object,
> ....)' threw
> exception; nested exception is
> com.hello.world.somepackage.exceptions.UnhappyException:
> Animal id 4444 is Unhappy

我们的应用程序将尝试再次处理消息(如果发生例外),并且在某些时候,该消息将降落在死信主题中,或者处理。不幸的是,由于我们当前的体系结构,我们的服务中不存在动物ID,因此可能会抛出几次例外,直到进行处理。

该消息被记录为“错误”,并且在Sentry警报平台中创建“ false”警报。

有没有办法将错误不满意的exception更改为警告级别,而无需更改哨兵配置?

我试图查看concurrentkafkalistenerconercontainerfactory,看看我是否可以通过自定义错误处理程序传递,但它根本不会达到断点。

    @Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
        ConsumerFactory<Object, Object> consumerFactory,
        KafkaTemplate<String, Object> kafkaTemplate) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory);

    /*
    // Original implementation
    factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));
    */

    // here I created CustomErrorHandler and tried to extend DefaultErrorHandler and override methods but they do not get hit by debugger at all.
    factory.setCommonErrorHandler(new CustomErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));

I have a kafka listener where if implementation throws exceptions, it will retry a few times. When exception is thrown, it is logged as error. This pollutes our Sentry log. I would like to change Kafka's consumers error level to warn level.

Details:

I have a Kafka listener and some events has to be retried.

@KafkaListener(...)
@RetryableTopic(..)
public void consume(ConsumerRecord<?,?> event) {
   service.sayHello(...); //this call can throw UnhappyException
}

In the case UnHappyException is thrown, it is logged as

> Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException:
> Listener method 'public void
> com.company.hello.world.listener.MyListener.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.Object,
> ....)' threw
> exception; nested exception is
> com.hello.world.somepackage.exceptions.UnhappyException:
> Animal id 4444 is Unhappy

Our application will try to process message again (if exception occurs), and at some point the message will either land in a dead letter topic, or be processed. Unfortunately, due to our current architecture, it is common for the animal id to be not present in our service, thus it could throw exceptions a few times until it gets processed.

The message gets logged as "ERROR" and it is creating "false" alerts in the Sentry alert platform.

Is there a way to change the error UnHappyException to warn level, without having to change the sentry configuration?

I tried to look at ConcurrentKafkaListenerContainerFactory and see if I could pass in custom error handler, but it does not hit break point at all.

    @Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
        ConsumerFactory<Object, Object> consumerFactory,
        KafkaTemplate<String, Object> kafkaTemplate) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory);

    /*
    // Original implementation
    factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));
    */

    // here I created CustomErrorHandler and tried to extend DefaultErrorHandler and override methods but they do not get hit by debugger at all.
    factory.setCommonErrorHandler(new CustomErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

爱你不解释 2025-02-20 20:56:45

事实证明,可重试的主题不会将错误记录为例外,直到登录死信主题,因此不需要采取任何措施。

It turns out retryable topic will not log errors as exception until it hits dead letter topic, so no action was needed.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文