从春季整合中收集者回来的反应不一致

发布于 2025-02-13 03:52:06 字数 3416 浏览 1 评论 0原文

我正在使用散点机图案,并拨打3个子流程。然后收集它们并汇总它们。我试图在汇总后取回结果。我将响应作为列表,但是列表中的两个响应并不一致。当我试图通过执行(0)来获得。这是不一致的。请建议我一种理解输出的方法,以便我可以将其分配给实际变量。

配置文件

 @Configuration
    public class IntegrationConfiguration {
      @Autowired LoansServiceImpl loansService;
    
      long dbId = new SequenceGenerator().nextId();
  //   Main flow
  @Bean
  public IntegrationFlow flow() {
    return flow ->
        flow.split()
            .log()
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .convert(LoanProvisionRequest.class)
            .scatterGather(
                scatterer ->
                    scatterer
                        .applySequence(true)
                        .recipientFlow(flow1())
                        .recipientFlow(flow2())
                        .recipientFlow(flow3()),
                gatherer -> gatherer.releaseLockBeforeSend(true))
            .log()
            .aggregate(a -> a.outputProcessor(MessageGroup::getMessages))
            .channel("output-flow");
  }
  //   flow1
  @Bean
  public IntegrationFlow flow1() {
    return integrationFlowDefination ->
        integrationFlowDefination
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                message -> {
                  try {
                    lionService.saveLionRequest(
                        (LionRequest) message.getPayload(), String.valueOf(dbId));
                  } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                  }
                });
  }

  //   flow2
  @Bean
  public IntegrationFlow flow2() {
    return integrationFlowDefination ->
        integrationFlowDefination
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                message ->
                    lionService.getData(
                        (LionRequest) message.getPayload(), SourceSystem.PROVISION))
            .log();
  }

  //  flow3
  @Bean
  public IntegrationFlow flow3() {
    return integrationFlowDefination ->
        integrationFlowDefination
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                message ->
                    lionService.prepareCDRequest(
                        (LionRequest) message));
  }

  @Bean
  public MessageChannel replyChannel() {
    return MessageChannels.executor("output-flow", outputExecutor()).get();
  }

  @Bean
  public ThreadPoolTaskExecutor outputExecutor() {
    ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    pool.setCorePoolSize(4);
    pool.setMaxPoolSize(4);
    return pool;
  }
}

网关

@MessagingGateway
public interface LionGateway {

  @Gateway(requestChannel = "flow.input", replyChannel = "output-flow")
  List<?> echo(LionRequest lionRequest);
}

控制器

@Autowired private LionGateway lionGateway;

 @PostMapping(value = "/invoke-integration")
  public String invokeIntegrationFlow(@RequestBody LionRequest lionRequest) {
    String response1 = lionGateway.echo(lionRequest).get(0);
    String response2 = lionGateway.echo(lionRequest).get(1);
    System.out.Println("response2 ")
    System.out.Println("response1 ")

    return "response";
  }

控制器中的 I有两个变量,即响应1&amp;响应2这两个值是不一致的。用更好的方法帮助我。

I'm using Scatter-gather pattern and calling 3 sub-flows parallelly. Then gathering them and aggregating them. I'm trying to get the result back after aggregating. I'm getting the response as List but the two responses that are there in the list are not consistent. When I'm trying to get by doing .get(0), sometime it's giving me 1st services's response other time 2ns service's response. It's inconsistent. Kindly suggest me a way to understand the output so that I can assign it to the actual variable.

Config file

 @Configuration
    public class IntegrationConfiguration {
      @Autowired LoansServiceImpl loansService;
    
      long dbId = new SequenceGenerator().nextId();
  //   Main flow
  @Bean
  public IntegrationFlow flow() {
    return flow ->
        flow.split()
            .log()
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .convert(LoanProvisionRequest.class)
            .scatterGather(
                scatterer ->
                    scatterer
                        .applySequence(true)
                        .recipientFlow(flow1())
                        .recipientFlow(flow2())
                        .recipientFlow(flow3()),
                gatherer -> gatherer.releaseLockBeforeSend(true))
            .log()
            .aggregate(a -> a.outputProcessor(MessageGroup::getMessages))
            .channel("output-flow");
  }
  //   flow1
  @Bean
  public IntegrationFlow flow1() {
    return integrationFlowDefination ->
        integrationFlowDefination
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                message -> {
                  try {
                    lionService.saveLionRequest(
                        (LionRequest) message.getPayload(), String.valueOf(dbId));
                  } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                  }
                });
  }

  //   flow2
  @Bean
  public IntegrationFlow flow2() {
    return integrationFlowDefination ->
        integrationFlowDefination
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                message ->
                    lionService.getData(
                        (LionRequest) message.getPayload(), SourceSystem.PROVISION))
            .log();
  }

  //  flow3
  @Bean
  public IntegrationFlow flow3() {
    return integrationFlowDefination ->
        integrationFlowDefination
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                message ->
                    lionService.prepareCDRequest(
                        (LionRequest) message));
  }

  @Bean
  public MessageChannel replyChannel() {
    return MessageChannels.executor("output-flow", outputExecutor()).get();
  }

  @Bean
  public ThreadPoolTaskExecutor outputExecutor() {
    ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    pool.setCorePoolSize(4);
    pool.setMaxPoolSize(4);
    return pool;
  }
}

Gateway

@MessagingGateway
public interface LionGateway {

  @Gateway(requestChannel = "flow.input", replyChannel = "output-flow")
  List<?> echo(LionRequest lionRequest);
}

Controller

@Autowired private LionGateway lionGateway;

 @PostMapping(value = "/invoke-integration")
  public String invokeIntegrationFlow(@RequestBody LionRequest lionRequest) {
    String response1 = lionGateway.echo(lionRequest).get(0);
    String response2 = lionGateway.echo(lionRequest).get(1);
    System.out.Println("response2 ")
    System.out.Println("response1 ")

    return "response";
  }

in the Controller I have two variables i.e., response1 & response2 these two values are coming as inconsistent. Help me with a better approach.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

叶落知秋 2025-02-20 03:52:06

由于您使用的是执行人频道,因此订单仅仅是答复返回的顺序。收集者对下游拓扑一无所知。

您需要处理代码中回复列表的随机顺序。

Since you are using executor channels, the order is simply the order in which the replies come back. The gatherer knows nothing about the downstream topology.

You will need to deal with the random order of the reply list in your code.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文