Spring kafka setErrorHandler 已弃用替换(引导 2.6.4)

发布于 2025-01-11 03:46:43 字数 1167 浏览 0 评论 0原文

在 Spring Boot 2.6.4 上,此方法已被弃用。

public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, consumerFactory());

        // deprecated
        factory.setErrorHandler(new GlobalErrorHandler());

        return factory;
    }

全局错误处理程序类

public class GlobalErrorHandler implements ConsumerAwareErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        // my custom global logic (e.g. notify ops team via slack)
    }

}

的替代示例是什么?文档说我应该使用 setCommonErrorHandler ,但是如何实现 CommonErrorHandler 接口,因为那里没有可以重写的方法。

要点是,我必须根据某些条件(消息 tpye,在 kafka 消息头中可用)向运营团队发送松弛通知,

这不是阻塞,只是一条烦人的已弃用消息。 谢谢

On spring boot 2.6.4, this method is deprecated.

public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, consumerFactory());

        // deprecated
        factory.setErrorHandler(new GlobalErrorHandler());

        return factory;
    }

The global error handler class

public class GlobalErrorHandler implements ConsumerAwareErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        // my custom global logic (e.g. notify ops team via slack)
    }

}

What is the replacement sample for this? The doc says I should use setCommonErrorHandler, but how to implements the CommonErrorHandler interface, as no method to be overriden there.

Point is, I have to send slack notification to ops team, based on certain condition (the message tpye, which is available on kafka message header)

This is not blocking, just an annoying deprecated message though.
Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

緦唸λ蓇 2025-01-18 03:46:44

我面临着完全相同的问题,所以我改变了方法实现 ConsumerAwareErrorHandler

通用错误处理程序

并实现

处理记录

像文档中描述的那样

public class GlobalErrorHandler implements CommonErrorHandler {

  private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

  @Override
  public void handleRecord(
      Exception thrownException,
      ConsumerRecord<?, ?> record,
      Consumer<?, ?> consumer,
      MessageListenerContainer container) {
    log.warn("Global error handler for message: {}", record.value().toString());
  }
}

并且它有效!在KafkaConfig.class中

  @Bean(value = "kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory());

    factory.setCommonErrorHandler(new GlobalErrorHandler());

    return factory;
  }

I was facing exactly the same problem, so I changed the method implementation ConsumerAwareErrorHandler by

CommonErrorHandler

and implemented

handleRecord

like described in the docs and it works!

public class GlobalErrorHandler implements CommonErrorHandler {

  private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

  @Override
  public void handleRecord(
      Exception thrownException,
      ConsumerRecord<?, ?> record,
      Consumer<?, ?> consumer,
      MessageListenerContainer container) {
    log.warn("Global error handler for message: {}", record.value().toString());
  }
}

In KafkaConfig.class

  @Bean(value = "kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory());

    factory.setCommonErrorHandler(new GlobalErrorHandler());

    return factory;
  }
地狱即天堂 2025-01-18 03:46:44

请参阅 Spring for Apache Kafka 文档;旧版错误处理程序已替换为 CommonErrorHandler 实现。

有什么新变化?

https: //docs.spring.io/spring-kafka/docs/current/reference/html/#x28-eh

旧版 GenericErrorHandler 及其用于记录批处理侦听器的子接口层次结构已被新的单一接口 CommonErrorHandler 取代,其实现与大多数旧版 GenericErrorHandler 实现相对应。有关详细信息,请参阅容器错误处理程序。

容器错误处理程序

https: //docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

从版本 2.8 开始,旧的 ErrorHandlerBatchErrorHandler 接口已被新的 CommonErrorHandler 取代。这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。提供了 CommonErrorHandler 实现来替换大多数遗留框架错误处理程序实现,并且不推荐使用遗留错误处理程序。侦听器容器和侦听器容器工厂仍然支持旧接口;它们将在未来的版本中被弃用。

See the Spring for Apache Kafka documentation; legacy error handlers are replaced with CommonErrorHandler implementations.

What's New?

https://docs.spring.io/spring-kafka/docs/current/reference/html/#x28-eh

The legacy GenericErrorHandler and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface CommonErrorHandler with implementations corresponding to most legacy implementations of GenericErrorHandler. See Container Error Handlers for more information.

Container Error Handlers

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

Starting with version 2.8, the legacy ErrorHandler and BatchErrorHandler interfaces have been superseded by a new CommonErrorHandler. These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. CommonErrorHandler implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated. The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release.

温柔嚣张 2025-01-18 03:46:44

对于使用较新版本的 Spring-Boot 3.0.x 的任何人,以下是基于 @GarryRussel 输入的实现:
从 Spring Boot 2.9 开始,handleRecord() 已被弃用
handleOne() 是它的替代品。

    factory.setCommonErrorHandler(new CommonErrorHandler() {
        @Override
        public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
            return CommonErrorHandler.super.handleOne(thrownException, record, consumer, container);
        }

        @Override
        public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
            CommonErrorHandler.super.handleOtherException(thrownException, consumer, container, batchListener);
        }
    });

For anyone who is on the the newer versions of Spring-Boot 3.0.x, here is the implementation based on @GarryRussel's input:
handleRecord() has been deprecated from Spring Boot 2.9
handleOne() is its replacement.

    factory.setCommonErrorHandler(new CommonErrorHandler() {
        @Override
        public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
            return CommonErrorHandler.super.handleOne(thrownException, record, consumer, container);
        }

        @Override
        public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
            CommonErrorHandler.super.handleOtherException(thrownException, consumer, container, batchListener);
        }
    });
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文