Spring Cloud Stream消费者交换连接重试
我有带有反应式 Spring Cloud Stream 的rabbitmq消费者
@Bean
public <T> Consumer<Flux<Message<Subscription>>> exportConsumer() {
return new ExportSubscriptionConsumer<>();
}
配置
spring:
cloud:
function:
definition: exportConsumer
stream:
rabbit:
bindings:
exportConsumer-in-0:
consumer:
bindQueue: true
queueNameGroupOnly: true
declareExchange: false
bindingRoutingKey: smfexportconsumer
function:
bindings:
exportConsumer-in-0:
destination: jms.durable.queues
bindings:
exportConsumer-in-0:
group: smfexportconsumer
,下面是我遇到的
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exportConsumer-in-0' in vhost '9509bf37-8afc-486e-ac98-61034066671d', class-id=50, method-id=20)
问题,在应用程序消费了几条消息后突然抛出了一个错误。如果我重新启动应用程序,它会工作一段时间,但又出现同样的问题! 我是否可以选择在此处配置重试,找不到任何此类配置此处 或者 我可以选择配置我的自定义重试逻辑吗? 请帮忙。
[编辑]
messageFlux
.doOnNext(s -> logger.info("Got message: {}", s))
.map(Message::getPayload)
.flatMap(this::converToCharacters)
.map(User::getId)
.flatMap(this::getUser)
.onErrorContinue((throwable, o) -> logger.error(throwable.getMessage()))
在转换方法内,我有一个 API 调用。
return webClient.get()
.uri(uriBuilderFactory.builder().path("/authorize").build())
.retrieve().bodyToMono(String.class)
.publishOn(Schedulers.immediate())
.timeout(Duration.ofSeconds(10))
.map(s1 -> new User(s, s1));
现在我的问题是,如果我在上述端点上超时,我该如何处理?它使用
onErrorContinue
还是我可以选择全局错误处理?
[编辑2] 示例代码此处
I have rabbitmq consumer with reactive spring cloud stream
@Bean
public <T> Consumer<Flux<Message<Subscription>>> exportConsumer() {
return new ExportSubscriptionConsumer<>();
}
and below are the configurations
spring:
cloud:
function:
definition: exportConsumer
stream:
rabbit:
bindings:
exportConsumer-in-0:
consumer:
bindQueue: true
queueNameGroupOnly: true
declareExchange: false
bindingRoutingKey: smfexportconsumer
function:
bindings:
exportConsumer-in-0:
destination: jms.durable.queues
bindings:
exportConsumer-in-0:
group: smfexportconsumer
Issue i have is it, after application consuming the few messages suddenly it throws me a error.
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exportConsumer-in-0' in vhost '9509bf37-8afc-486e-ac98-61034066671d', class-id=50, method-id=20)
If i restart the application it works for a while but again same issue!
Do i have option to configure retry here, couldn't found any such configurations here
or
Do i have option to configure my custom retry logic.
Kindly help.
[EDIT]
messageFlux
.doOnNext(s -> logger.info("Got message: {}", s))
.map(Message::getPayload)
.flatMap(this::converToCharacters)
.map(User::getId)
.flatMap(this::getUser)
.onErrorContinue((throwable, o) -> logger.error(throwable.getMessage()))
Inside the convert method, I have an API call.
return webClient.get()
.uri(uriBuilderFactory.builder().path("/authorize").build())
.retrieve().bodyToMono(String.class)
.publishOn(Schedulers.immediate())
.timeout(Duration.ofSeconds(10))
.map(s1 -> new User(s, s1));
Now the question I have is if I get a time out with the above endpoint, how do I handle it? is it using
onErrorContinue
or do I have the option of global error handling?
[EDIT-2]
Sample code here
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论