如何处理反应性弹簧积分中的反应性类型?
我正在使用反应性弹簧集成进行一些播放,并尝试执行以下基本操作:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(someReactiveInboundChannelAdapter)
.handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
}
但是这不起作用,因为该框架无法识别flux
是flux&lt;? &gt;
实例。该框架将其视为消息&lt;?&gt;
,我不知道如何以及在哪里可以开始编写反应堆代码。
I'm playing a little with Reactive Spring Integration and I try to perform the following basic operation:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(someReactiveInboundChannelAdapter)
.handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
}
But this doesn't work since the framework won't recognize that flux
is a Flux<?>
instance. The framework treats this as a Message<?>
and I don't know how and where can I start to write Reactor code.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
没错。
reactiveVemessageHandlerAdapter
期望eacterivevemessagehandler
哪个合同是:我不确定是什么促使您认为输入必须是
flux
。我假设您的
somereActiviNBoundChannelAdapter
是MessageProducerSupport
的扩展,并且确实sisscribetepublisher(发布者&lt;? 。这样做的目的是以反应性的方式从源中获取数据,但仍会产生每个事件作为向下游通道的消息。因此,应该清楚地表明,
handle()
将接收一件带有单个项目的消息,而不是整个flux
。如果您想将流程视为通量,请考虑使用
fluxtransform()
运算符。同样,最好不要订阅自己,而是在配置和启动结束时在框架中做到这一点。
从技术上讲,您不应该考虑反应性类型。您只需要分别配置流量并仅为单个项目编写逻辑:框架为您提供反应性交互。项目反应堆是围绕
flux
和mono
的库。这就是我们谈论反应类型的地方。春季集成是一个消息框架,通过消息进行通信。最后,对于每一个消息,端点之间的相互作用是否以反应性方式进行。因此,您的处理逻辑可以没有反应性类型。udate
如果您希望从该
flux
完全控制somereActiveInboundChannelAdapter
,那么您需要做到这一点:然后注入该
>
出版商
只要您需要使用它。然后doflux.from(Publisher)
以及您需要的任何反应性操作员,包括subscribe()
。其中一些样本在这里: https://github.com/spring-projects/spring-spring-spring-integration/spring-integration/blob/main/main/spring-integration-integration-integration-integration-integration-core-core-in-src/src/src/test/java/java/java/java/org/spremppramewrame-framewarmewame一下/integration/dsl/reaectivestreams/reaectivestreamstests.java
IntegrationFlowAdapter
不能用于此类配置,因为它不能接受IntegrationFlow
作为IntegrationFlow
代码> buildflow()。但是,
fluxtransform()
也可以为您做到这一点:因此,下游流的有效载荷将是
flux&lt; message&lt;?您可以应付自己。从该
fluxtransform()
返回的mono
将由框架订阅。它的价值的通量
是您在下游流程中的责任。然后,您可以使用普通MessageHandler
:That's correct. The
ReactiveMessageHandlerAdapter
expects a lambda for theReactiveMessageHandler
which contract is:I'm not sure what drove you to think that input has to be a
Flux
.I assume your
someReactiveInboundChannelAdapter
is an extension of theMessageProducerSupport
and does exactlysubscribeToPublisher(Publisher<? extends Message<?>> publisher)
logic. The point of this is to take data from the source in reactive manner, but still produce every event as a message to downstream channel. So, that should become clear that ahandle()
is going to receive a message with a single item, not the wholeFlux
.If you want to see the flow as a flux, consider to use a
fluxTransform()
operator.Also it is better to not subscribe yourself, but let to do that in the framework, when configuration and startup is over.
Technically you should not think about reactive types. You just need to configure a flow respectively and write a logic only for individual item: the framework does a reactive interaction for you. The Project Reactor is a library around
Flux
andMono
. That's where we talk about reactive types. The Spring Integration is a messaging framework where its communication is done via messages. And in the end for every single message it must not matter if interaction between endpoints is done in reactive manner or not. Therefore your processing logic can be free from reactive types.UDATE
If you want the full control of the
Flux
from thatsomeReactiveInboundChannelAdapter
, then you need to do like this:and then inject that
Publisher
whenever you need to use it. Then doFlux.from(publisher)
and whatever reactive operators you need, includingsubscribe()
.Some sample of that is here: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
The
IntegrationFlowAdapter
cannot be used for this type of configuration since it cannot accept theIntegrationFlow
as a result of thebuildFlow()
.The
fluxTransform()
can do that for you as well, though:So, the payload of downstream flow is going to be a
Flux<Message<?>>
, which you can handle yourself. The returnedMono
from thatfluxTransform()
is going to be subscribed by the framework. ThatFlux
of its value is your responsibility in the downstream flow. Then you can use the plainMessageHandler
: