春季集成渠道首次起作用,然后不第二次
当使用春季集成和通道与另一集成流程时,它仅在第一次工作。 然后,它跳过频道并返回。
从第一个集成流程:
.handle((p, h) -> {
System.out.println("Payload Before Channel" + p.toString());
return p;
})
.channel(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
.handle((p, h) -> {
System.out.println("Payload After Channel" + p.toString());
return p;
})
然后在下一个集成流程中:
@Bean
public IntegrationFlow jamsSubmitJob() {
return IntegrationFlows.from(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
.handle((p, h) -> {
try {
jamsToken = authMang.getJamsAuth().getTokenWithTokenType();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
JAMS_SUBMIT_JOB_INTGRTN
.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Integration called.");
JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Headers:= " + h);
JAMS_SUBMIT_JOB_INTGRTN.debug(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Payload:= " + p);
return p;
})
.handle((p, h) -> {
// hail mary to get new token
return MessageBuilder
.withPayload(p)
.removeHeaders("*")
.setHeader(HttpHeaders.AUTHORIZATION.toLowerCase(), jamsToken)
.setHeader(HttpHeaders.CONTENT_TYPE.toLowerCase(), "application/json")
.build();
})
.handle((p, h) -> {
JAMS_SUBMIT_JOB_INTGRTN
.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Submitting payload to JAMS:");
JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Headers:= " + h);
JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Payload:= " + p);
return p;
})
.handle(Http.outboundGateway(JAMS_SUBMIT_ENDPOINT)
.requestFactory(alliantPooledHttpConnection.get_httpComponentsClientHttpRequestFactory())
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.extractPayload(true))
.logAndReply();
}
行为是其他所有消息都会通过,基本上跳过通道直到下次绕过。奇怪的是,如果我复制了Jamssubmitjob bean,那么它将工作两次,然后失败,然后重新开始。
谢谢!
When using Spring Integration and Channel to another integration flow, it only works the first time.
Then after that it skips over the channel and returns.
From first integration flow:
.handle((p, h) -> {
System.out.println("Payload Before Channel" + p.toString());
return p;
})
.channel(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
.handle((p, h) -> {
System.out.println("Payload After Channel" + p.toString());
return p;
})
Then on the next integration flow:
@Bean
public IntegrationFlow jamsSubmitJob() {
return IntegrationFlows.from(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
.handle((p, h) -> {
try {
jamsToken = authMang.getJamsAuth().getTokenWithTokenType();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
JAMS_SUBMIT_JOB_INTGRTN
.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Integration called.");
JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Headers:= " + h);
JAMS_SUBMIT_JOB_INTGRTN.debug(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Payload:= " + p);
return p;
})
.handle((p, h) -> {
// hail mary to get new token
return MessageBuilder
.withPayload(p)
.removeHeaders("*")
.setHeader(HttpHeaders.AUTHORIZATION.toLowerCase(), jamsToken)
.setHeader(HttpHeaders.CONTENT_TYPE.toLowerCase(), "application/json")
.build();
})
.handle((p, h) -> {
JAMS_SUBMIT_JOB_INTGRTN
.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Submitting payload to JAMS:");
JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Headers:= " + h);
JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString() + " Payload:= " + p);
return p;
})
.handle(Http.outboundGateway(JAMS_SUBMIT_ENDPOINT)
.requestFactory(alliantPooledHttpConnection.get_httpComponentsClientHttpRequestFactory())
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.extractPayload(true))
.logAndReply();
}
The behavior is that every other message gets through, basically skipping over the channel until the next time around. Strangely if I duplicate the jamsSubmitJob Bean, then it will work twice, then fail, then start over again.
Thanks!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是一个误解什么是渠道,什么是该渠道的订户。
因此,您有这样的方法:
这样,您就会声明一个频道(如果在应用程序上下文中不存在)并将其订阅。
然后,您有:
这样,您就会声明一个频道(如果在应用程序上下文中不存在)并将其订阅。
您是否在我的解释中注意到重复?我故意这样做是为了注意这个问题。
因此,如果不存在频道,则将创建。在另一个地方,使用了现有对象。最后,您最终会获得两个订户到同一频道。默认情况下,该框架为我们创建了一个
directChannel
的实例,该实例带有圆形启动策略。这意味着第一个消息将转到第一个订户,第二个订户 - 第二个订户,第二个订阅者,第三个订阅者到第一个,依此类推。您想要的可能是请求模式,您最好查看
.gateway(IntegrationNamesEnum.jams_submit_job_intgrtn.getChannelName())
而不是.channel()您的第一个
IntegrationFlow
。请参阅文档中的更多信息:
https://docs.spring.io/spring-integration/docs/current/referent/referent/html/core.html#channel-implementation-directchannel-directchannel
https://docs.spring.io/spring-integration/docs/current/referent/referent/html/dsl.html.html#java-dsl-channels
https://docs.spring.io/spring-integration/docs/current/referent/reference/html/dsl.html.html#java-dsl-gateway
This is a misunderstanding what is channel and what is a subscriber to that channel.
So, you have this:
This way you declare a channel (if that does not exist in the application context yet) and subscriber to it.
Then you have this:
This way you declare a channel (if that does not exist in the application context yet) and subscriber to it.
Did you notice a duplication in my explanation? I did that deliberately to take your attention to the problem.
So, if channel does not exist it is created. In the other place an existing object is used. In the end you end up with two subscribers to the same channel. The framework by default creates for us an instance of a
DirectChannel
, which come with a round-robin dispatching strategy. That means that the firsts message is going to a first subscriber, the second - to second, the third to the first and so on.What you want is probably a request-reply pattern, and you better look into the
.gateway(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
instead of.channel()
in that your firstIntegrationFlow
.See more info in docs:
https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-directchannel
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway