Spring Cloud Kafka Stream StreamsUncaughtExceptionHandler

发布于 2025-01-10 21:40:23 字数 2579 浏览 0 评论 0原文

我正在尝试将 StreamsUncaughtExceptionHandler 添加到我的 Kafka 流处理器中。该处理器是用 Kafka 函数编写的。我查看了 Artem Bilan 提供的建议,将 StreamsUncaughtExceptionHandler 包含到我的服务中,但我的异常从未被捕获/由其处理。

配置Bean:

@Autowired
UnCaughtExceptionHandler exceptionHandler;

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
    return new StreamsBuilderFactoryBeanConfigurer() {

        @Override
        public void configure(StreamsBuilderFactoryBean factoryBean) {;
            factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
        }

        @Override
        public int getOrder() {
            return Integer.MAX_VALUE;
        }

    };
}

自定义异常处理程序:

    @Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

  @Autowired
  private StreamBridge streamBridge;

  @Override
  public StreamThreadExceptionResponse handle(Throwable exception) {
    return StreamThreadExceptionResponse.REPLACE_THREAD;
  }
}

流处理函数:

@Autowired
private MyService service;

@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
    final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
    return kStream -> kStream
            .filter((key, value) -> value != null)
            .filter((key, value) -> {
                Optional<Output> outputResult = service.process(value);
                if (outputResult.isPresent()) {
                    result.set(new KeyValue<>(key, outputResult.get()));
                    return true;
                }
                return false;
            })
        .map((messageKey, messageValue) -> result.get());
}

我希望 UnCaughtExceptionHandler 能够处理 service.process() 方法引发的任何异常。但异常永远不会进入handle方法;相反,它们传播到根并终止客户端。也查看了这个解决方案,但我想以更独立的方式处理它。

问题:如何使用 StreamsUncaughtExceptionHandler 处理任何处理异常?

  • Spring boot 版本:2.6.3
  • spring-cloud-stream 版本:3.2.1
  • spring-cloud-stream-binder-kafka-streams:3.2.1
  • kafka-streams:3.0.0

可重现示例:spring-cloud-kafka-streams-异常

I am trying to add StreamsUncaughtExceptionHandler to my Kafka stream processor. This processor is written with Kafka functions. I had a look at the suggestion provided by Artem Bilan to include the StreamsUncaughtExceptionHandler to my service, but my exceptions never get caught/handled by it.

Config Bean:

@Autowired
UnCaughtExceptionHandler exceptionHandler;

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
    return new StreamsBuilderFactoryBeanConfigurer() {

        @Override
        public void configure(StreamsBuilderFactoryBean factoryBean) {;
            factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
        }

        @Override
        public int getOrder() {
            return Integer.MAX_VALUE;
        }

    };
}

Customized exception handler:

    @Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

  @Autowired
  private StreamBridge streamBridge;

  @Override
  public StreamThreadExceptionResponse handle(Throwable exception) {
    return StreamThreadExceptionResponse.REPLACE_THREAD;
  }
}

Stream Process function:

@Autowired
private MyService service;

@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
    final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
    return kStream -> kStream
            .filter((key, value) -> value != null)
            .filter((key, value) -> {
                Optional<Output> outputResult = service.process(value);
                if (outputResult.isPresent()) {
                    result.set(new KeyValue<>(key, outputResult.get()));
                    return true;
                }
                return false;
            })
        .map((messageKey, messageValue) -> result.get());
}

I expect UnCaughtExceptionHandler to handle any exceptions thrown by the service.process() method. But exceptions never get into the handle method; instead, they propagate to the root and die the client. Had a look at this solution as well, but I want to handle it more independent way.

Question: How do I handle any processing exceptions using StreamsUncaughtExceptionHandler?

  • Spring boot version: 2.6.3
  • spring-cloud-stream version: 3.2.1
  • spring-cloud-stream-binder-kafka-streams: 3.2.1
  • kafka-streams: 3.0.0

Reproducible example: spring-cloud-kafka-streams-exception

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不知所踪 2025-01-17 21:40:23

您可以尝试以下几项操作。

  1. 尝试在 这行StreamsBuilderFactoryBean中,看看configure的值是什么 是。这应该会提供一些线索。

  2. 我注意到您在配置的实现中为订单设置了Integer.MAX_VALUE。默认情况下,StreamsBuilderFactoryBean 使用 Integer.MAX_VALUE - 1000 的阶段值,因此当工厂 bean 准备启动时,配置器可能会由于 Integer.MAX_VALUE 的优先级较低,因此尚不可用。您可以将顺序更改为 Integer.MAX_VALUE - 5000 之类的内容,以确保配置 bean 在工厂 bean 启动之前完全实例化。

从这些选项开始,看看它们是否给出了有关该问题的任何指示。如果它仍然存在,请随时与我们分享一个可重复的小示例应用程序。

Here are a couple of things that you can try.

  1. Try to set a breakpoint at this line in StreamsBuilderFactoryBean and see what the value of the configure is. That should give some clues.

  2. I noticed that you set Integer.MAX_VALUE for the order in your configured impl. By default, StreamsBuilderFactoryBean uses a phase value of Integer.MAX_VALUE - 1000, so there might be a chance that by the time the factory bean is ready to start, the configurer may not be available yet since Integer.MAX_VALUE has lower precedence. You can change your order to something like Integer.MAX_VALUE - 5000 to ensure that the configure bean is fully instantiated before the factory bean is started.

Start with those options and see if they give any indications for the issue. If it still persists, feel free to share with us a reproducible small sample application.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文