如何确保来自 JMS 队列的消息传递到外部 WebService (CXF)?

发布于 2024-12-28 11:35:56 字数 7324 浏览 1 评论 0原文

问题

应该如何在Mule ESB 3.2中配置ActiveMQ,以确保从队列中拉出的消息最终由外部 CXF 服务正确处理?

场景

我有一个 CXF 端点,它应该接收传入消息并将其尽快传输到三个外部服务。我们称它们为 EX1、EX2、EX3。由于 Mule 3.x 中引入了 组件,这相当容易。

整个解决方案最重要的要求是确保每条收到的消息最终都传递到所有三个 CXF 服务。因此,我们最终得出了将每条传入消息放入持久 JMS 队列(Q1、Q2、Q3)的想法。从队列 Qn 读取消息后,它会直接传输到相应的 EXn 端点,从而传输到外部服务。

配置

(我可以根据要求提供完整的配置)

我们已经按照此处 并将其与我们的 配置连接起来。一切似乎都按预期工作,我已将 JConsole 连接到我的应用程序,因此我可以看到消息的类型为 PERSISTENT,并且它们最终位于正确的队列中。如果一切顺利 - 所有三个服务 EXn 都会收到消息。

测试

当我们关闭其中一项服务(例如 EX2)并重新启动整个服务器模拟故障时,问题就会出现。 消息最终丢失了(我想它并没有那么持久,是吧?)。 最奇怪的是 - 如果我们在 EX2 宕机时发送了 10 条消息,那么在服务器重新启动后,其中 9 条消息将被正确重新发送!所以我想,也许,只是也许,这 10 条消息中的 9 条已正确排队,而当服务器发生故障时,另一条消息会不断重新传送。

这让我想到,CXF 端点没有通过事务支持进行处理,老实说,我无法理解这一点。毕竟,当尝试重新传递消息时,我可以看到消息位于队列中,因此应该保留它。显然不是,但是为什么呢?

我自己的尝试 我尝试了很多方法,但没有一个有效。总是有一条消息丢失。

  1. 不要在流程中使用任何 标签 - 不起作用
  2. 在消息接收时启动 jms 事务,在发送到
  3. 将 XA 与 JBoss 和 一起使用 - 不起作用
  4. 提供 配置 - 如果我记得它让事情变得更糟,

感谢任何帮助,谢谢。

配置

活动 MQ 配置

<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
    <spring:property name="queue" value="queue.*"/>
    <spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>

<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
    <spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&amp;broker.persistent=true&amp;broker.useJmx=true"/>
    <spring:property name="redeliveryPolicy">
        <spring:bean class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
            <spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
            <spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
            <spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
        </spring:bean>
    </spring:property>
</spring:bean>

<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
    <spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>

<spring:bean name="AmqBroker"
             class="org.apache.activemq.broker.BrokerService"
             init-method="start"
             destroy-method="stop">
    <spring:property name="brokerName" value="esb-amq-broker"/>
    <spring:property name="persistent" value="true"/>
    <spring:property name="dataDirectory" value="/home/bachman/activemq"/>
    <spring:property name="useJmx" value="true"/>
    <spring:property name="useShutdownHook" value="false"/>
    <spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
    <spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>

<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
                        numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
                        connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
                        disableTemporaryReplyToDestinations="true"/>

FLOW - 将传入消息分派到 3 个队列 Qn

<flow name="dispatch-to-queues">
        <inbound-endpoint ref="incoming-cxf"/>

        <!-- Each received message ends up to be sent to all destinations -->
        <all>
            <jms:outbound-endpoint name="queue.q1"
                queue="queue.q1" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q1"
                    connector-ref="PersistentJMSConnector"/>

            <jms:outbound-endpoint name="queue.q2"
                queue="queue.q2" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on q2"
                connector-ref="PersistentJMSConnector" />

            <jms:outbound-endpoint name="queue.q3"
                queue="queue.q3" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q3"
                connector-ref="PersistentJMSConnector" />

        </all>
        <custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
    </flow>

FLOW - 处理从 Qn 到 EXn 的传递

<flow name="from-q1-to-ex1">
        <jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
            disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
            doc:name="JMS" doc:description="Pull from q1."
            connector-ref="PersistentJMSConnector">
                <jms:transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
        <logger message="Sending message to EX-1" level="INFO" />

        <!-- Handle errors at this point in flow
        <custom-processor class="pl.exception.lookup.Component">
            <spring:property name="targetModuleName" value="Not-important"/>
        </custom-processor>
        -->


        <outbound-endpoint ref="ex1-cxf-endpoint">
            <jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
        </outbound-endpoint>
    </flow>

ENDPOINTS - 引用端点的声明

<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
        <cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
    </endpoint> 

<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
        <cxf:jaxws-client
                clientClass="com.mycompany.services.Ex1"
                wsdlLocation="classpath:wsdl/ex1.wsdl"
                operation="someOperation"
                port="SomePort"/>
    </endpoint>

The question

How one should configure ActiveMQ and <flow> in Mule ESB 3.2, in order to make sure that message pulled from queue ends up properly handled by external CXF service?

Scenario

I have an CXF endpoint, which should take incoming message and transfer it to three external services as soon as possible. Let's call them EX1, EX2, EX3. This is fairly easy, thanks to the <all> component introduced in Mule 3.x.

The most important requirement of the whole solution, is to make sure that each received message ends up being delivered to all three CXF services. So we ended up with the idea, to put each incoming message into Persistent JMS queues (Q1, Q2, Q3). After message is being read from queue Qn, it's transfered directly to corresponding EXn endpoint, and thus - external service.

Config

(I can provide full config upon request)

We have configured ActiveMQ broker as described here and wired it up with our <flow> config. Everything seems to work as expected, I have JConsole connected to my application so I can see that messages are of type PERSISTENT and they end up in proper queues. If everything goes smoothly - messages are received by all three services EXn.

Tests

The problem arrises when we turn off one of the services, let's say EX2, and restart the whole server simulating failure. The message ends up being lost (I guess it's not that persistent, huh?).
The most curious thing is - If we sent 10 messages when the EX2 is down, after server restart 9 of them are being properly redelivered! So I'm thinking that maybe, just maybe, 9 of those 10 messages were properly enqueued, while the one was being constantly redelivered when the server failed down.

This causes me to think, that CXF endpoint is not being handled with transaction support, which I cannot understand, to be honest. After all I can see the message being in the queue when it's trying to be redelivered, so it should be persisted. It's clearly not, but why?

My own attempts
I have tried a number of things, none of which have worked. Always one message gets lost.

  1. Not to use any <jms:transaction /> tags within the flows - didn't work
  2. Starting jms transaction upon message receive, joining while sending to <cxf:jaxws-client />
  3. Using XA with JBoss and <xa-transaction /> - didn't work
  4. Providing <default-exception-strategy> config - If I recall it made things worst

Any help is appreciated, thanks.

CONFIG

ACTIVE MQ CONFIGURATION

<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
    <spring:property name="queue" value="queue.*"/>
    <spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>

<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
    <spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&broker.persistent=true&broker.useJmx=true"/>
    <spring:property name="redeliveryPolicy">
        <spring:bean class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
            <spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
            <spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
            <spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
        </spring:bean>
    </spring:property>
</spring:bean>

<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
    <spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>

<spring:bean name="AmqBroker"
             class="org.apache.activemq.broker.BrokerService"
             init-method="start"
             destroy-method="stop">
    <spring:property name="brokerName" value="esb-amq-broker"/>
    <spring:property name="persistent" value="true"/>
    <spring:property name="dataDirectory" value="/home/bachman/activemq"/>
    <spring:property name="useJmx" value="true"/>
    <spring:property name="useShutdownHook" value="false"/>
    <spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
    <spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>

<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
                        numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
                        connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
                        disableTemporaryReplyToDestinations="true"/>

FLOW - dispatch incoming message to 3 queues Qn

<flow name="dispatch-to-queues">
        <inbound-endpoint ref="incoming-cxf"/>

        <!-- Each received message ends up to be sent to all destinations -->
        <all>
            <jms:outbound-endpoint name="queue.q1"
                queue="queue.q1" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q1"
                    connector-ref="PersistentJMSConnector"/>

            <jms:outbound-endpoint name="queue.q2"
                queue="queue.q2" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on q2"
                connector-ref="PersistentJMSConnector" />

            <jms:outbound-endpoint name="queue.q3"
                queue="queue.q3" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q3"
                connector-ref="PersistentJMSConnector" />

        </all>
        <custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
    </flow>

FLOW - handle delivery from Qn to EXn

<flow name="from-q1-to-ex1">
        <jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
            disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
            doc:name="JMS" doc:description="Pull from q1."
            connector-ref="PersistentJMSConnector">
                <jms:transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
        <logger message="Sending message to EX-1" level="INFO" />

        <!-- Handle errors at this point in flow
        <custom-processor class="pl.exception.lookup.Component">
            <spring:property name="targetModuleName" value="Not-important"/>
        </custom-processor>
        -->


        <outbound-endpoint ref="ex1-cxf-endpoint">
            <jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
        </outbound-endpoint>
    </flow>

ENDPOINTS - declaration of referred endpoints

<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
        <cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
    </endpoint> 

<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
        <cxf:jaxws-client
                clientClass="com.mycompany.services.Ex1"
                wsdlLocation="classpath:wsdl/ex1.wsdl"
                operation="someOperation"
                port="SomePort"/>
    </endpoint>

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

童话 2025-01-04 11:35:56

要使解决方案按预期工作,必须在事务中使用 JMS 消息:如果 CXF 出站阶段发生异常,JMS 消息最终将回滚,然后重新传递,触发新的 CXF 调用。

您必须仔细配置 ActiveMQ 客户端的重新传递策略,以便重试足够多的次数,并且重试不要太快(例如指数回退)。您还需要适当地处理 DLQ。 ActiveMQ 在 Mule 中使用 Spring Beans 的客户端配置如下所示: http://www.mulesoft.org/mule-activemq-integration -examples

另外,请务必在配置工厂中引用正确的代理 URL。如果您的代理名称为 esb-amq-broker,您的配置工厂应为:

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://esb-amq-broker"/>
    ...

Consuming JMS messages in a transaction is a must for the solution to work as expected: if an exception occurs in the CXF outbound phase, the JMS message will end-up rolled back, then redelivered, triggering a new CXF call.

You must carefully configure the redelivery policy for your ActiveMQ client in order to retry enough times and maybe not too fast (exponential back-off for example). You also want to handle the DLQ appropriately. ActiveMQ's client configuration with Spring Beans in Mule is shown: http://www.mulesoft.org/mule-activemq-integration-examples

Also be sure to refer to the right broker URL in your configuration factory. With your broker name of esb-amq-broker, your configuration factory should be:

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://esb-amq-broker"/>
    ...
瀞厅☆埖开 2025-01-04 11:35:56

我不知道我是否会对您有很大帮助,但这是关于您的问题的一些建议:

  • 您是否尝试过使用 Jboss 提供的另一种事务管理器,我建议使用 Atomikos 进行此类测试,
  • 如 David 建议的事务似乎是最好的方法,但另一种方法是使用显式确认策略...设置可能很棘手,但类似拦截器的方法可以监视与某些特定端点的连接并将确认发送回 JMS 服务器, 难的可能是,但它肯定会确保消息已正确传递......

祝你好运
华泰
杰罗姆

I don't know if I will help you much but this is a couple of suggestions regarding your problem:

  • have you tried to use another transaction manager than the one provided with Jboss, I would suggest to use Atomikos for such tests
  • like David suggested Transactions seem to be the best approach , but another approach would be to use explicit acknowledgment policy .... It may be tricky to set up but an interceptor like approach could watch for connections to some specific endpoints and send the ack back to your JMS server, difficult may be but it would definitely ensure that the message has been correctly delivered ....

Good luck
HTH
jerome

牵你的手,一向走下去 2025-01-04 11:35:56

不确定这种考虑是否有帮助,但是确认模式呢?有没有可能消息已经被传递(在自动确认模式下),但消费服务端点尚未正确处理?

不知道如何在这种情况下配置显式确认,但也许值得进一步研究。

Not sure if this consideration helps, but what about acknowledgement modes? Could it be possible that the message has been delivered already (in auto acknowledge mode) but was not yet properly processed by the consuming service endpoint?

No idea how to configure explicit acknowledgement in this scenario, but maybe worth to investigate further.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文