Spring Cloud Kafka Stream StreamsUncaughtExceptionHandler
我正在尝试将 StreamsUncaughtExceptionHandler 添加到我的 Kafka 流处理器中。该处理器是用 Kafka 函数编写的。我查看了 Artem Bilan 提供的建议,将 StreamsUncaughtExceptionHandler 包含到我的服务中,但我的异常从未被捕获/由其处理。
配置Bean:
@Autowired
UnCaughtExceptionHandler exceptionHandler;
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {;
factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
自定义异常处理程序:
@Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Autowired
private StreamBridge streamBridge;
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
流处理函数:
@Autowired
private MyService service;
@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
return kStream -> kStream
.filter((key, value) -> value != null)
.filter((key, value) -> {
Optional<Output> outputResult = service.process(value);
if (outputResult.isPresent()) {
result.set(new KeyValue<>(key, outputResult.get()));
return true;
}
return false;
})
.map((messageKey, messageValue) -> result.get());
}
我希望 UnCaughtExceptionHandler 能够处理 service.process() 方法引发的任何异常。但异常永远不会进入handle方法;相反,它们传播到根并终止客户端。也查看了这个解决方案,但我想以更独立的方式处理它。
问题:如何使用 StreamsUncaughtExceptionHandler 处理任何处理异常?
- Spring boot 版本:2.6.3
- spring-cloud-stream 版本:3.2.1
- spring-cloud-stream-binder-kafka-streams:3.2.1
- kafka-streams:3.0.0
I am trying to add StreamsUncaughtExceptionHandler to my Kafka stream processor. This processor is written with Kafka functions. I had a look at the suggestion provided by Artem Bilan to include the StreamsUncaughtExceptionHandler to my service, but my exceptions never get caught/handled by it.
Config Bean:
@Autowired
UnCaughtExceptionHandler exceptionHandler;
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {;
factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
Customized exception handler:
@Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Autowired
private StreamBridge streamBridge;
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
Stream Process function:
@Autowired
private MyService service;
@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
return kStream -> kStream
.filter((key, value) -> value != null)
.filter((key, value) -> {
Optional<Output> outputResult = service.process(value);
if (outputResult.isPresent()) {
result.set(new KeyValue<>(key, outputResult.get()));
return true;
}
return false;
})
.map((messageKey, messageValue) -> result.get());
}
I expect UnCaughtExceptionHandler to handle any exceptions thrown by the service.process() method. But exceptions never get into the handle method; instead, they propagate to the root and die the client. Had a look at this solution as well, but I want to handle it more independent way.
Question: How do I handle any processing exceptions using StreamsUncaughtExceptionHandler?
- Spring boot version: 2.6.3
- spring-cloud-stream version: 3.2.1
- spring-cloud-stream-binder-kafka-streams: 3.2.1
- kafka-streams: 3.0.0
Reproducible example: spring-cloud-kafka-streams-exception
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以尝试以下几项操作。
尝试在 这行在
StreamsBuilderFactoryBean
中,看看configure的值是什么 是。这应该会提供一些线索。我注意到您在配置的实现中为订单设置了
Integer.MAX_VALUE
。默认情况下,StreamsBuilderFactoryBean
使用Integer.MAX_VALUE - 1000
的阶段值,因此当工厂 bean 准备启动时,配置器可能会由于Integer.MAX_VALUE
的优先级较低,因此尚不可用。您可以将顺序更改为Integer.MAX_VALUE - 5000
之类的内容,以确保配置 bean 在工厂 bean 启动之前完全实例化。从这些选项开始,看看它们是否给出了有关该问题的任何指示。如果它仍然存在,请随时与我们分享一个可重复的小示例应用程序。
Here are a couple of things that you can try.
Try to set a breakpoint at this line in
StreamsBuilderFactoryBean
and see what the value of the configure is. That should give some clues.I noticed that you set
Integer.MAX_VALUE
for the order in your configured impl. By default,StreamsBuilderFactoryBean
uses a phase value ofInteger.MAX_VALUE - 1000
, so there might be a chance that by the time the factory bean is ready to start, the configurer may not be available yet sinceInteger.MAX_VALUE
has lower precedence. You can change your order to something likeInteger.MAX_VALUE - 5000
to ensure that the configure bean is fully instantiated before the factory bean is started.Start with those options and see if they give any indications for the issue. If it still persists, feel free to share with us a reproducible small sample application.