春季云流合并响应来自两个不同的功能

发布于 2025-01-20 16:09:04 字数 1936 浏览 3 评论 0原文

我正在尝试使用Spring Cloud Stream解决以下问题:

“在此处输入图像说明”

我有一个调用两个分离函数(函数a和b)的类,如果功能,则两个函数都必须并行工作a完成它必须调用函数c,如果函数b完成,则会发生同样的情况,但这会调用函数d,然后我需要等待函数c和函数d才能完成并在单个响应中合并两个响应,然后返回此合并后响应对象必须等待获得该响应的开始类。

我遇到的问题是:

  • 如何调用函数C将函数传递给响应?
  • 如何等到功能C和功能D完成并在功能e中获得响应?
  • 如何等待控制器中函数e的响应,我正在使用streambridge.send同时启动功能A和功能B。

我正在使用

  • spring-cloud-stream-3.1.3
  • spring-cloud-stream-binder-rabbit 必需

我不能使用kafka 必需

示例代码

serviceClass

@Service
@RequiredArgsConstructor
public class ServiceClass {

    @NonNull
    private final StreamBridge streamBridge;

    @Override
    protected MergedResponse execute(Input input) {
        var send1 = streamBridge.send("functionA-in-0", input);
        var send2 = streamBridge.send("functionB-in-0", input);

        //TODO: Wait for Function E response object
    }
}

函数a

@Slf4j
@Configuration
public class FunctionAClass{

    @Bean
    public Function<Input, OutputFunctionA> functionA() {
        return input -> {
            //TODO: Invoke Function C to pass OutputFunctionA object
            return OutputFunctionA.builder.build();
        };
    }
}

我不介意使用供应商< /code>或消费者而不是函数

编辑 嗨,@Oleg Zhurakousky感谢您的帮助,回答您的问题我的问题是:我必须创建一个消耗n不同的第三方休息端点的休息端点(首先是两个,async是必须的由于依次处理每个请求的速度太慢),因此我不需要它们的所有数据,只有几个字段来构建一个共同的对象。我打算使用streambridge来启动前两个功能,以构建每个第三方的要求请求,然后构建一个函数来调用其每个端点,然后一个函数来构建一个常见对象具有每个响应,最后一个函数收集Commons对象并将其发送给我服务的响应中。让我知道您是否还有另一个问题。

问候。

I'm trying to use spring cloud stream to solve the following problem:

enter image description here

I have a class that calls two separated functions (Function A and B), both of those functions must work in parallel if the Function A finishes it must call the Function C, the same happens if Function B finish but this will call Function D, then I need to wait for Function C and Function D to finish and merge both responses in a single response, and then return this merged response object to the starting class that must be waiting to get that response.

The problems I have are:

  • How do I call Function C to pass the Function A response?
  • How to wait until Function C and Function D finish and get their responses in Function E?
  • How to wait for the response of Function E in the controller, I'm using streamBridge.send which to start Function A and Function B at the same time.

I'm using

  • spring-cloud-stream-3.1.3
  • spring-cloud-stream-binder-rabbit Required

I cannot use Kafka Required

Sample code

ServiceClass

@Service
@RequiredArgsConstructor
public class ServiceClass {

    @NonNull
    private final StreamBridge streamBridge;

    @Override
    protected MergedResponse execute(Input input) {
        var send1 = streamBridge.send("functionA-in-0", input);
        var send2 = streamBridge.send("functionB-in-0", input);

        //TODO: Wait for Function E response object
    }
}

Function A

@Slf4j
@Configuration
public class FunctionAClass{

    @Bean
    public Function<Input, OutputFunctionA> functionA() {
        return input -> {
            //TODO: Invoke Function C to pass OutputFunctionA object
            return OutputFunctionA.builder.build();
        };
    }
}

I don't mind using Supplier or Consumer instead of Function.

Edit
Hi, @Oleg Zhurakousky thanks for your help, to answer your question my problem is: I have to create a REST endpoint that consume N different third-party REST endpoints (two at first, async is a must as it will be too slow to process every request sequentially) I don't need all the data from them, just a few fields to build a common object. I'm planning to used streamBridge to start the first two functions that will to build the required request for each third-party, then a function to invoke each of their endpoints, then a function to build a common object with each response, and finally a function that collects the commons objects and send them in the response of my service. Let me know if you have another question.

Regards.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

牵你的手,一向走下去 2025-01-27 16:09:04

有几点。

  1. 由于您引入了一个同步点,因此无论您尝试什么,最终都会遇到阻塞调用,因为同步(例如您的聚合要求)必须等待两个响应,然后进行关联等。最重要的是,存在一个问题说明在聚合过程中发生系统崩溃时您将要处理的情况以及如何恢复这种状态。
  2. 聚合并不是 sc-stream 的真正用例,因此我们没有基于框架的支持。我会考虑使用 Spring Integration 框架为两个以上不同的端点创建管道,然后使用[聚合器模式](聚合器模式支持)进行聚合,然后使用 StreamBridge 将数据发送到目标目的地。

Couple of points.

  1. Since you are introducing a synchronization point you will have in the end a blocking call no matter what you try since synchronization (such as your aggregation requirement) would have to wait, for two responses, correlate then etc. On top there is an issue of state that you are going to deal with and how to recover such state in the event of a system crash during aggregation.
  2. Aggregation is not really a use case for s-c-stream so we don't have a framework-based support for it. I would consider using Spring Integration framework to create pipelines for two+ different endpoints, aggregate then using [Aggregator pattern](aggregator pattern support ) and then use StreamBridge to send data out to target destination.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文