从春季整合中收集者回来的反应不一致
我正在使用散点机图案,并拨打3个子流程。然后收集它们并汇总它们。我试图在汇总后取回结果。我将响应作为列表,但是列表中的两个响应并不一致。当我试图通过执行(0)来获得。这是不一致的。请建议我一种理解输出的方法,以便我可以将其分配给实际变量。
配置文件
@Configuration
public class IntegrationConfiguration {
@Autowired LoansServiceImpl loansService;
long dbId = new SequenceGenerator().nextId();
// Main flow
@Bean
public IntegrationFlow flow() {
return flow ->
flow.split()
.log()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.convert(LoanProvisionRequest.class)
.scatterGather(
scatterer ->
scatterer
.applySequence(true)
.recipientFlow(flow1())
.recipientFlow(flow2())
.recipientFlow(flow3()),
gatherer -> gatherer.releaseLockBeforeSend(true))
.log()
.aggregate(a -> a.outputProcessor(MessageGroup::getMessages))
.channel("output-flow");
}
// flow1
@Bean
public IntegrationFlow flow1() {
return integrationFlowDefination ->
integrationFlowDefination
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(
message -> {
try {
lionService.saveLionRequest(
(LionRequest) message.getPayload(), String.valueOf(dbId));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
}
// flow2
@Bean
public IntegrationFlow flow2() {
return integrationFlowDefination ->
integrationFlowDefination
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(
message ->
lionService.getData(
(LionRequest) message.getPayload(), SourceSystem.PROVISION))
.log();
}
// flow3
@Bean
public IntegrationFlow flow3() {
return integrationFlowDefination ->
integrationFlowDefination
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(
message ->
lionService.prepareCDRequest(
(LionRequest) message));
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.executor("output-flow", outputExecutor()).get();
}
@Bean
public ThreadPoolTaskExecutor outputExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(4);
pool.setMaxPoolSize(4);
return pool;
}
}
网关
@MessagingGateway
public interface LionGateway {
@Gateway(requestChannel = "flow.input", replyChannel = "output-flow")
List<?> echo(LionRequest lionRequest);
}
控制器
@Autowired private LionGateway lionGateway;
@PostMapping(value = "/invoke-integration")
public String invokeIntegrationFlow(@RequestBody LionRequest lionRequest) {
String response1 = lionGateway.echo(lionRequest).get(0);
String response2 = lionGateway.echo(lionRequest).get(1);
System.out.Println("response2 ")
System.out.Println("response1 ")
return "response";
}
控制器中的 I有两个变量,即响应1&amp;响应2这两个值是不一致的。用更好的方法帮助我。
I'm using Scatter-gather pattern and calling 3 sub-flows parallelly. Then gathering them and aggregating them. I'm trying to get the result back after aggregating. I'm getting the response as List but the two responses that are there in the list are not consistent. When I'm trying to get by doing .get(0), sometime it's giving me 1st services's response other time 2ns service's response. It's inconsistent. Kindly suggest me a way to understand the output so that I can assign it to the actual variable.
Config file
@Configuration
public class IntegrationConfiguration {
@Autowired LoansServiceImpl loansService;
long dbId = new SequenceGenerator().nextId();
// Main flow
@Bean
public IntegrationFlow flow() {
return flow ->
flow.split()
.log()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.convert(LoanProvisionRequest.class)
.scatterGather(
scatterer ->
scatterer
.applySequence(true)
.recipientFlow(flow1())
.recipientFlow(flow2())
.recipientFlow(flow3()),
gatherer -> gatherer.releaseLockBeforeSend(true))
.log()
.aggregate(a -> a.outputProcessor(MessageGroup::getMessages))
.channel("output-flow");
}
// flow1
@Bean
public IntegrationFlow flow1() {
return integrationFlowDefination ->
integrationFlowDefination
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(
message -> {
try {
lionService.saveLionRequest(
(LionRequest) message.getPayload(), String.valueOf(dbId));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
}
// flow2
@Bean
public IntegrationFlow flow2() {
return integrationFlowDefination ->
integrationFlowDefination
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(
message ->
lionService.getData(
(LionRequest) message.getPayload(), SourceSystem.PROVISION))
.log();
}
// flow3
@Bean
public IntegrationFlow flow3() {
return integrationFlowDefination ->
integrationFlowDefination
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(
message ->
lionService.prepareCDRequest(
(LionRequest) message));
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.executor("output-flow", outputExecutor()).get();
}
@Bean
public ThreadPoolTaskExecutor outputExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(4);
pool.setMaxPoolSize(4);
return pool;
}
}
Gateway
@MessagingGateway
public interface LionGateway {
@Gateway(requestChannel = "flow.input", replyChannel = "output-flow")
List<?> echo(LionRequest lionRequest);
}
Controller
@Autowired private LionGateway lionGateway;
@PostMapping(value = "/invoke-integration")
public String invokeIntegrationFlow(@RequestBody LionRequest lionRequest) {
String response1 = lionGateway.echo(lionRequest).get(0);
String response2 = lionGateway.echo(lionRequest).get(1);
System.out.Println("response2 ")
System.out.Println("response1 ")
return "response";
}
in the Controller I have two variables i.e., response1 & response2 these two values are coming as inconsistent. Help me with a better approach.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
由于您使用的是执行人频道,因此订单仅仅是答复返回的顺序。收集者对下游拓扑一无所知。
您需要处理代码中回复列表的随机顺序。
Since you are using executor channels, the order is simply the order in which the replies come back. The gatherer knows nothing about the downstream topology.
You will need to deal with the random order of the reply list in your code.