如何在现实世界的 JMS 分布式架构中利用 Spring Integration?

发布于 2024-09-04 16:09:10 字数 462 浏览 10 评论 0原文

对于以下场景,我正在寻求您有关最佳实践的建议和提示:

在分布式(主要基于 Java)系统中,具有:

  • 许多(不同的)客户端应用程序(Web 应用程序、命令行工具、REST API)
  • 中央 JMS消息代理(目前支持使用 ActiveMQ)
  • 多个独立处理节点(在多台远程计算机上运行,​​计算由 JMS 消息有效负载指定的不同类型的昂贵操作)

如何最好地应用 Spring Integration 框架将客户端与工作节点解耦?当阅读参考文档和一些最初的实验时,看起来 JMS 入站适配器的配置本质上需要使用订阅者,而在解耦场景中不存在订阅者。

小旁注:通信应通过 JMS 文本消息进行(使用 JSON 数据结构以实现未来的可扩展性)。

For the following scenario I am looking for your advices and tips on best practices:

In a distributed (mainly Java-based) system with:

  • many (different) client applications (web-app, command-line tools, REST API)
  • a central JMS message broker (currently in favor of using ActiveMQ)
  • multiple stand-alone processing nodes (running on multiple remote machines, computing expensive operations of different types as specified by the JMS message payload)

How would one best apply the JMS support provided by the Spring Integration framework to decouple the clients from the worker nodes? When reading through the reference documentation and some very first experiments it looks like the configuration of an JMS inbound adapter inherently require to use a subscriber, which in a decoupled scenario does not exist.

Small side note: communication should happen via JMS text messages (using a JSON data structure for future extensibility).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

静赏你的温柔 2024-09-11 16:09:10

这并不能真正回答您的问题,但请确保您研究 Apache Camel 来连接不同的组件。我发现它对于将 JMS 队列连接到现有 Web 服务非常有用,并计划也将其用于其他组件。

一个监视 ActiveMQ 队列中的消息、转换它们并将它们发布到 Web 服务的示例:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:camel="http://camel.apache.org/schema/spring"
   xsi:schemaLocation="
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
      http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring-2.3.0.xsd">

<bean id="callbackProcessor" class="com.package.CallbackProcessor"/>

<bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="jmsFactory" />
</bean>

<camel:camelContext id="camel">
    <!-- Must put this in camel:endpoint because camel:from doesn't support property substitution -->
    <camel:endpoint id="callbackQueue" uri="activemq:queue:${jms.callback-queue-name}"/>
    <camel:route>
        <camel:from ref="callbackQueue"/>
        <camel:process ref="callbackProcessor"/>
        <camel:to uri="http://dummy"/><!-- This will be replaced by the callbackProcessor with the callback URL in the message -->
    </camel:route>
</camel:camelContext>
</beans>

这就是我们的 Spring 应用程序中启动 Camel 并开始处理消息所需的全部内容。

This doesn't really answer your question, but make sure you look into Apache Camel for connecting your different components. I found it extremely useful for connecting a JMS queue up to an existing web service and plan to use it for other components also.

An example that monitors an ActiveMQ queue for messages, transforms them, and posts them to a web service:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:camel="http://camel.apache.org/schema/spring"
   xsi:schemaLocation="
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
      http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring-2.3.0.xsd">

<bean id="callbackProcessor" class="com.package.CallbackProcessor"/>

<bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="jmsFactory" />
</bean>

<camel:camelContext id="camel">
    <!-- Must put this in camel:endpoint because camel:from doesn't support property substitution -->
    <camel:endpoint id="callbackQueue" uri="activemq:queue:${jms.callback-queue-name}"/>
    <camel:route>
        <camel:from ref="callbackQueue"/>
        <camel:process ref="callbackProcessor"/>
        <camel:to uri="http://dummy"/><!-- This will be replaced by the callbackProcessor with the callback URL in the message -->
    </camel:route>
</camel:camelContext>
</beans>

That's all that's necessary in our Spring application to fire up Camel and start processing messages.

薯片软お妹 2024-09-11 16:09:10

这是我今天提出的 Spring Integration,如果您发现可以改进的地方,请跟进。

在客户端,消息可以通过 SimpleMessagingGateway 发送和接收:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"   
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"   
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <import resource="integration-common.xml"/>

    <!-- Communication Gateway for the Client (send/receive) -->
    <bean id="gateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
        <property name="requestChannel" ref="SenderChannel"/>
        <property name="replyChannel" ref="InboundChannel"/>
        <property name="replyTimeout" value="1000"/>
    </bean><!-- TODO: could use integration:gateway -->

    <!-- Sending out message to JMS request queue -->
    <integration:channel id="SenderChannel"/>
    <jms:outbound-channel-adapter
                        channel="SenderChannel" 
                        destination="requestQueue" />

    <!-- Listen to incoming messages on JMS reply queue -->
    <integration:channel id="InboundChannel">
        <integration:queue/>
    </integration:channel>
    <jms:message-driven-channel-adapter
            destination="replyQueue"
            channel="InboundChannel" />

</beans>

处理节点端的配置如下所示(请参阅内联注释以获取 Spring Integration 元素的更多说明):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"   
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"   
    xmlns:stream="http://www.springframework.org/schema/integration/stream" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <import resource="integration-common.xml"/>

    <!-- Read in Message Endpoint Service Activator classes --> 
    <context:component-scan base-package="sample.integration.jmsbasic"/>

    <!-- Listen to incoming messages on the JMS request queue -->
    <integration:channel id="jmsinToProcChannel"/>
    <jms:message-driven-channel-adapter
            destination="requestQueue"
            channel="jmsinToProcChannel"/>

    <!-- Delegate message to service implementation and take care of answer -->
    <integration:service-activator 
            input-channel="jmsinToProcChannel" 
            ref="procService"
            output-channel="jmsBackChannel" />

    <!-- Send answer back to JMS reply queue -->
    <integration:channel id="jmsBackChannel"/>
    <jms:outbound-channel-adapter
                        channel="jmsBackChannel" 
                        destination="replyQueue" />

</beans>

Here is the Spring Integration I was coming up with today, if you find things which could be improved please follow up.

On the client side the messages can be send out and received through a SimpleMessagingGateway:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"   
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"   
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <import resource="integration-common.xml"/>

    <!-- Communication Gateway for the Client (send/receive) -->
    <bean id="gateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
        <property name="requestChannel" ref="SenderChannel"/>
        <property name="replyChannel" ref="InboundChannel"/>
        <property name="replyTimeout" value="1000"/>
    </bean><!-- TODO: could use integration:gateway -->

    <!-- Sending out message to JMS request queue -->
    <integration:channel id="SenderChannel"/>
    <jms:outbound-channel-adapter
                        channel="SenderChannel" 
                        destination="requestQueue" />

    <!-- Listen to incoming messages on JMS reply queue -->
    <integration:channel id="InboundChannel">
        <integration:queue/>
    </integration:channel>
    <jms:message-driven-channel-adapter
            destination="replyQueue"
            channel="InboundChannel" />

</beans>

And the configuration on the processing node side looks like (please see the comments inline for more explanation of the Spring Integration elements):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"   
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"   
    xmlns:stream="http://www.springframework.org/schema/integration/stream" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <import resource="integration-common.xml"/>

    <!-- Read in Message Endpoint Service Activator classes --> 
    <context:component-scan base-package="sample.integration.jmsbasic"/>

    <!-- Listen to incoming messages on the JMS request queue -->
    <integration:channel id="jmsinToProcChannel"/>
    <jms:message-driven-channel-adapter
            destination="requestQueue"
            channel="jmsinToProcChannel"/>

    <!-- Delegate message to service implementation and take care of answer -->
    <integration:service-activator 
            input-channel="jmsinToProcChannel" 
            ref="procService"
            output-channel="jmsBackChannel" />

    <!-- Send answer back to JMS reply queue -->
    <integration:channel id="jmsBackChannel"/>
    <jms:outbound-channel-adapter
                        channel="jmsBackChannel" 
                        destination="replyQueue" />

</beans>
夏末的微笑 2024-09-11 16:09:10

您是否询问 Spring Integration 是否可以用于实现协议 桥接?那么答案是肯定的,而且非常简单。

Are you asking if Spring Integration can be used to implement a protocol bridge? Then the answer is yes, and does so quite simply.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文