如何处理反应性弹簧积分中的反应性类型?

发布于 2025-01-19 22:27:03 字数 481 浏览 2 评论 0原文

我正在使用反应性弹簧集成进行一些播放,并尝试执行以下基本操作:

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(someReactiveInboundChannelAdapter)
                .handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
    }

但是这不起作用,因为该框架无法识别fluxflux&lt;? &gt;实例。该框架将其视为消息&lt;?&gt;,我不知道如何以及在哪里可以开始编写反应堆代码。

I'm playing a little with Reactive Spring Integration and I try to perform the following basic operation:

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(someReactiveInboundChannelAdapter)
                .handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
    }

But this doesn't work since the framework won't recognize that flux is a Flux<?> instance. The framework treats this as a Message<?> and I don't know how and where can I start to write Reactor code.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

小鸟爱天空丶 2025-01-26 22:27:03

没错。 reactiveVemessageHandlerAdapter期望eacterivevemessagehandler哪个合同是:

Mono<Void> handleMessage(Message<?> message);

我不确定是什么促使您认为输入必须是flux

我假设您的somereActiviNBoundChannelAdapterMessageProducerSupport的扩展,并且确实sisscribetepublisher(发布者&lt;? 。这样做的目的是以反应性的方式从源中获取数据,但仍会产生每个事件作为向下游通道的消息。因此,应该清楚地表明,handle()将接收一件带有单个项目的消息,而不是整个flux

如果您想将流程视为通量,请考虑使用fluxtransform()运算符。

同样,最好不要订阅自己,而是在配置和启动结束时在框架中做到这一点。

从技术上讲,您不应该考虑反应性类型。您只需要分别配置流量并仅为单个项目编写逻辑:框架为您提供反应性交互。项目反应堆是围绕fluxmono的库。这就是我们谈论反应类型的地方。春季集成是一个消息框架,通过消息进行通信。最后,对于每一个消息,端点之间的相互作用是否以反应性方式进行。因此,您的处理逻辑可以没有反应性类型。

udate

如果您希望从该flux完全控制somereActiveInboundChannelAdapter,那么您需要做到这一点:

    @Bean
    public Publisher<Message<Object>> reactiveFlow() {
          return IntegrationFlows.from(someReactiveInboundChannelAdapter)
          .toReactivePublisher();
    }

然后注入该> 出版商只要您需要使用它。然后do flux.from(Publisher)以及您需要的任何反应性操作员,包括subscribe()

其中一些样本在这里: https://github.com/spring-projects/spring-spring-spring-integration/spring-integration/blob/main/main/spring-integration-integration-integration-integration-integration-core-core-in-src/src/src/test/java/java/java/java/org/spremppramewrame-framewarmewame一下/integration/dsl/reaectivestreams/reaectivestreamstests.java

IntegrationFlowAdapter不能用于此类配置,因为它不能接受IntegrationFlow作为IntegrationFlow代码> buildflow()。

但是,fluxtransform()也可以为您做到这一点:

.fluxTransform(flux -> flux.as(Mono::just))

因此,下游流的有效载荷将是flux&lt; message&lt;?您可以应付自己。从该fluxtransform()返回的mono将由框架订阅。它的价值的通量是您在下游流程中的责任。然后,您可以使用普通MessageHandler

.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())

That's correct. The ReactiveMessageHandlerAdapter expects a lambda for the ReactiveMessageHandler which contract is:

Mono<Void> handleMessage(Message<?> message);

I'm not sure what drove you to think that input has to be a Flux.

I assume your someReactiveInboundChannelAdapter is an extension of the MessageProducerSupport and does exactly subscribeToPublisher(Publisher<? extends Message<?>> publisher) logic. The point of this is to take data from the source in reactive manner, but still produce every event as a message to downstream channel. So, that should become clear that a handle() is going to receive a message with a single item, not the whole Flux.

If you want to see the flow as a flux, consider to use a fluxTransform() operator.

Also it is better to not subscribe yourself, but let to do that in the framework, when configuration and startup is over.

Technically you should not think about reactive types. You just need to configure a flow respectively and write a logic only for individual item: the framework does a reactive interaction for you. The Project Reactor is a library around Flux and Mono. That's where we talk about reactive types. The Spring Integration is a messaging framework where its communication is done via messages. And in the end for every single message it must not matter if interaction between endpoints is done in reactive manner or not. Therefore your processing logic can be free from reactive types.

UDATE

If you want the full control of the Flux from that someReactiveInboundChannelAdapter, then you need to do like this:

    @Bean
    public Publisher<Message<Object>> reactiveFlow() {
          return IntegrationFlows.from(someReactiveInboundChannelAdapter)
          .toReactivePublisher();
    }

and then inject that Publisher whenever you need to use it. Then do Flux.from(publisher) and whatever reactive operators you need, including subscribe().

Some sample of that is here: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

The IntegrationFlowAdapter cannot be used for this type of configuration since it cannot accept the IntegrationFlow as a result of the buildFlow().

The fluxTransform() can do that for you as well, though:

.fluxTransform(flux -> flux.as(Mono::just))

So, the payload of downstream flow is going to be a Flux<Message<?>>, which you can handle yourself. The returned Mono from that fluxTransform() is going to be subscribed by the framework. That Flux of its value is your responsibility in the downstream flow. Then you can use the plain MessageHandler:

.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文