Spring Cloud Stream消费者交换连接重试

发布于 2025-01-12 02:34:04 字数 2331 浏览 5 评论 0原文

我有带有反应式 Spring Cloud Stream 的rabbitmq消费者

@Bean
public <T> Consumer<Flux<Message<Subscription>>> exportConsumer() {
    return new ExportSubscriptionConsumer<>();
}

配置

spring:
cloud:
    function:
        definition: exportConsumer
    stream:
        rabbit:
            bindings:
                exportConsumer-in-0:
                    consumer:
                        bindQueue: true
                        queueNameGroupOnly: true
                        declareExchange: false
                        bindingRoutingKey: smfexportconsumer
        function:
            bindings:
                exportConsumer-in-0:
                    destination: jms.durable.queues
        bindings:
            exportConsumer-in-0:
                group: smfexportconsumer

,下面是我遇到的

Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exportConsumer-in-0' in vhost '9509bf37-8afc-486e-ac98-61034066671d', class-id=50, method-id=20)

问题,在应用程序消费了几条消息后突然抛出了一个错误。如果我重新启动应用程序,它会工作一段时间,但又出现同样的问题! 我是否可以选择在此处配置重试,找不到任何此类配置此处 或者 我可以选择配置我的自定义重试逻辑吗? 请帮忙。

[编辑]

messageFlux
            .doOnNext(s -> logger.info("Got message: {}", s))
            .map(Message::getPayload)
            .flatMap(this::converToCharacters)
            .map(User::getId)
            .flatMap(this::getUser)
           .onErrorContinue((throwable, o) -> logger.error(throwable.getMessage()))

在转换方法内,我有一个 API 调用。

return webClient.get()
            .uri(uriBuilderFactory.builder().path("/authorize").build())
            .retrieve().bodyToMono(String.class)
            .publishOn(Schedulers.immediate())
            .timeout(Duration.ofSeconds(10))
            .map(s1 -> new User(s, s1));

现在我的问题是,如果我在上述端点上超时,我该如何处理?它使用

onErrorContinue

还是我可以选择全局错误处理?

[编辑2] 示例代码此处

I have rabbitmq consumer with reactive spring cloud stream

@Bean
public <T> Consumer<Flux<Message<Subscription>>> exportConsumer() {
    return new ExportSubscriptionConsumer<>();
}

and below are the configurations

spring:
cloud:
    function:
        definition: exportConsumer
    stream:
        rabbit:
            bindings:
                exportConsumer-in-0:
                    consumer:
                        bindQueue: true
                        queueNameGroupOnly: true
                        declareExchange: false
                        bindingRoutingKey: smfexportconsumer
        function:
            bindings:
                exportConsumer-in-0:
                    destination: jms.durable.queues
        bindings:
            exportConsumer-in-0:
                group: smfexportconsumer

Issue i have is it, after application consuming the few messages suddenly it throws me a error.

Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exportConsumer-in-0' in vhost '9509bf37-8afc-486e-ac98-61034066671d', class-id=50, method-id=20)

If i restart the application it works for a while but again same issue!
Do i have option to configure retry here, couldn't found any such configurations here
or
Do i have option to configure my custom retry logic.
Kindly help.

[EDIT]

messageFlux
            .doOnNext(s -> logger.info("Got message: {}", s))
            .map(Message::getPayload)
            .flatMap(this::converToCharacters)
            .map(User::getId)
            .flatMap(this::getUser)
           .onErrorContinue((throwable, o) -> logger.error(throwable.getMessage()))

Inside the convert method, I have an API call.

return webClient.get()
            .uri(uriBuilderFactory.builder().path("/authorize").build())
            .retrieve().bodyToMono(String.class)
            .publishOn(Schedulers.immediate())
            .timeout(Duration.ofSeconds(10))
            .map(s1 -> new User(s, s1));

Now the question I have is if I get a time out with the above endpoint, how do I handle it? is it using

onErrorContinue

or do I have the option of global error handling?

[EDIT-2]
Sample code here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文