如何确保来自 JMS 队列的消息传递到外部 WebService (CXF)?
问题
应该如何在Mule ESB 3.2
中配置ActiveMQ
和
,以确保从队列中拉出的消息最终由外部 CXF 服务正确处理?
场景
我有一个 CXF 端点,它应该接收传入消息并将其尽快传输到三个外部服务。我们称它们为 EX1、EX2、EX3。由于 Mule 3.x 中引入了
组件,这相当容易。
整个解决方案最重要的要求是确保每条收到的消息最终都传递到所有三个 CXF 服务。因此,我们最终得出了将每条传入消息放入持久 JMS 队列(Q1、Q2、Q3)的想法。从队列 Qn 读取消息后,它会直接传输到相应的 EXn 端点,从而传输到外部服务。
配置
(我可以根据要求提供完整的配置)
我们已经按照此处 并将其与我们的
配置连接起来。一切似乎都按预期工作,我已将 JConsole 连接到我的应用程序,因此我可以看到消息的类型为 PERSISTENT,并且它们最终位于正确的队列中。如果一切顺利 - 所有三个服务 EXn 都会收到消息。
测试
当我们关闭其中一项服务(例如 EX2)并重新启动整个服务器模拟故障时,问题就会出现。 消息最终丢失了(我想它并没有那么持久,是吧?)。 最奇怪的是 - 如果我们在 EX2 宕机时发送了 10 条消息,那么在服务器重新启动后,其中 9 条消息将被正确重新发送!所以我想,也许,只是也许,这 10 条消息中的 9 条已正确排队,而当服务器发生故障时,另一条消息会不断重新传送。
这让我想到,CXF 端点没有通过事务支持进行处理,老实说,我无法理解这一点。毕竟,当尝试重新传递消息时,我可以看到消息位于队列中,因此应该保留它。显然不是,但是为什么呢?
我自己的尝试 我尝试了很多方法,但没有一个有效。总是有一条消息丢失。
- 不要在流程中使用任何
标签 - 不起作用 - 在消息接收时启动 jms 事务,在发送到
- 将 XA 与 JBoss 和
一起使用 - 不起作用 - 提供
配置 - 如果我记得它让事情变得更糟,
感谢任何帮助,谢谢。
配置
活动 MQ 配置
<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
<spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&broker.persistent=true&broker.useJmx=true"/>
<spring:property name="redeliveryPolicy">
<spring:bean class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
<spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
<spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
<spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
</spring:bean>
</spring:property>
</spring:bean>
<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
<spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService"
init-method="start"
destroy-method="stop">
<spring:property name="brokerName" value="esb-amq-broker"/>
<spring:property name="persistent" value="true"/>
<spring:property name="dataDirectory" value="/home/bachman/activemq"/>
<spring:property name="useJmx" value="true"/>
<spring:property name="useShutdownHook" value="false"/>
<spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
<spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>
<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
disableTemporaryReplyToDestinations="true"/>
FLOW - 将传入消息分派到 3 个队列 Qn
<flow name="dispatch-to-queues">
<inbound-endpoint ref="incoming-cxf"/>
<!-- Each received message ends up to be sent to all destinations -->
<all>
<jms:outbound-endpoint name="queue.q1"
queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q1"
connector-ref="PersistentJMSConnector"/>
<jms:outbound-endpoint name="queue.q2"
queue="queue.q2" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on q2"
connector-ref="PersistentJMSConnector" />
<jms:outbound-endpoint name="queue.q3"
queue="queue.q3" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q3"
connector-ref="PersistentJMSConnector" />
</all>
<custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
</flow>
FLOW - 处理从 Qn 到 EXn 的传递
<flow name="from-q1-to-ex1">
<jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Pull from q1."
connector-ref="PersistentJMSConnector">
<jms:transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<logger message="Sending message to EX-1" level="INFO" />
<!-- Handle errors at this point in flow
<custom-processor class="pl.exception.lookup.Component">
<spring:property name="targetModuleName" value="Not-important"/>
</custom-processor>
-->
<outbound-endpoint ref="ex1-cxf-endpoint">
<jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
</outbound-endpoint>
</flow>
ENDPOINTS - 引用端点的声明
<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
</endpoint>
<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
<cxf:jaxws-client
clientClass="com.mycompany.services.Ex1"
wsdlLocation="classpath:wsdl/ex1.wsdl"
operation="someOperation"
port="SomePort"/>
</endpoint>
The question
How one should configure ActiveMQ
and <flow>
in Mule ESB 3.2
, in order to make sure that message pulled from queue ends up properly handled by external CXF service
?
Scenario
I have an CXF endpoint, which should take incoming message and transfer it to three external services as soon as possible. Let's call them EX1, EX2, EX3. This is fairly easy, thanks to the <all>
component introduced in Mule 3.x.
The most important requirement of the whole solution, is to make sure that each received message ends up being delivered to all three CXF services. So we ended up with the idea, to put each incoming message into Persistent JMS queues
(Q1, Q2, Q3). After message is being read from queue Qn, it's transfered directly to corresponding EXn endpoint, and thus - external service.
Config
(I can provide full config upon request)
We have configured ActiveMQ broker as described here and wired it up with our <flow>
config. Everything seems to work as expected, I have JConsole connected to my application so I can see that messages are of type PERSISTENT and they end up in proper queues. If everything goes smoothly - messages are received by all three services EXn.
Tests
The problem arrises when we turn off one of the services, let's say EX2, and restart the whole server simulating failure. The message ends up being lost (I guess it's not that persistent, huh?).
The most curious thing is - If we sent 10 messages when the EX2 is down, after server restart 9 of them are being properly redelivered! So I'm thinking that maybe, just maybe, 9 of those 10 messages were properly enqueued, while the one was being constantly redelivered when the server failed down.
This causes me to think, that CXF endpoint is not being handled with transaction support, which I cannot understand, to be honest. After all I can see the message being in the queue when it's trying to be redelivered, so it should be persisted. It's clearly not, but why?
My own attempts
I have tried a number of things, none of which have worked. Always one message gets lost.
- Not to use any
<jms:transaction />
tags within the flows - didn't work - Starting jms transaction upon message receive, joining while sending to
<cxf:jaxws-client />
- Using XA with JBoss and
<xa-transaction />
- didn't work - Providing
<default-exception-strategy>
config - If I recall it made things worst
Any help is appreciated, thanks.
CONFIG
ACTIVE MQ CONFIGURATION
<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
<spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&broker.persistent=true&broker.useJmx=true"/>
<spring:property name="redeliveryPolicy">
<spring:bean class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
<spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
<spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
<spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
</spring:bean>
</spring:property>
</spring:bean>
<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
<spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService"
init-method="start"
destroy-method="stop">
<spring:property name="brokerName" value="esb-amq-broker"/>
<spring:property name="persistent" value="true"/>
<spring:property name="dataDirectory" value="/home/bachman/activemq"/>
<spring:property name="useJmx" value="true"/>
<spring:property name="useShutdownHook" value="false"/>
<spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
<spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>
<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
disableTemporaryReplyToDestinations="true"/>
FLOW - dispatch incoming message to 3 queues Qn
<flow name="dispatch-to-queues">
<inbound-endpoint ref="incoming-cxf"/>
<!-- Each received message ends up to be sent to all destinations -->
<all>
<jms:outbound-endpoint name="queue.q1"
queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q1"
connector-ref="PersistentJMSConnector"/>
<jms:outbound-endpoint name="queue.q2"
queue="queue.q2" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on q2"
connector-ref="PersistentJMSConnector" />
<jms:outbound-endpoint name="queue.q3"
queue="queue.q3" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q3"
connector-ref="PersistentJMSConnector" />
</all>
<custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
</flow>
FLOW - handle delivery from Qn to EXn
<flow name="from-q1-to-ex1">
<jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Pull from q1."
connector-ref="PersistentJMSConnector">
<jms:transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<logger message="Sending message to EX-1" level="INFO" />
<!-- Handle errors at this point in flow
<custom-processor class="pl.exception.lookup.Component">
<spring:property name="targetModuleName" value="Not-important"/>
</custom-processor>
-->
<outbound-endpoint ref="ex1-cxf-endpoint">
<jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
</outbound-endpoint>
</flow>
ENDPOINTS - declaration of referred endpoints
<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
</endpoint>
<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
<cxf:jaxws-client
clientClass="com.mycompany.services.Ex1"
wsdlLocation="classpath:wsdl/ex1.wsdl"
operation="someOperation"
port="SomePort"/>
</endpoint>
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
要使解决方案按预期工作,必须在事务中使用 JMS 消息:如果 CXF 出站阶段发生异常,JMS 消息最终将回滚,然后重新传递,触发新的 CXF 调用。
您必须仔细配置 ActiveMQ 客户端的重新传递策略,以便重试足够多的次数,并且重试不要太快(例如指数回退)。您还需要适当地处理 DLQ。 ActiveMQ 在 Mule 中使用 Spring Beans 的客户端配置如下所示: http://www.mulesoft.org/mule-activemq-integration -examples
另外,请务必在配置工厂中引用正确的代理 URL。如果您的代理名称为 esb-amq-broker,您的配置工厂应为:
Consuming JMS messages in a transaction is a must for the solution to work as expected: if an exception occurs in the CXF outbound phase, the JMS message will end-up rolled back, then redelivered, triggering a new CXF call.
You must carefully configure the redelivery policy for your ActiveMQ client in order to retry enough times and maybe not too fast (exponential back-off for example). You also want to handle the DLQ appropriately. ActiveMQ's client configuration with Spring Beans in Mule is shown: http://www.mulesoft.org/mule-activemq-integration-examples
Also be sure to refer to the right broker URL in your configuration factory. With your broker name of esb-amq-broker, your configuration factory should be:
我不知道我是否会对您有很大帮助,但这是关于您的问题的一些建议:
祝你好运
华泰
杰罗姆
I don't know if I will help you much but this is a couple of suggestions regarding your problem:
Good luck
HTH
jerome
不确定这种考虑是否有帮助,但是确认模式呢?有没有可能消息已经被传递(在自动确认模式下),但消费服务端点尚未正确处理?
不知道如何在这种情况下配置显式确认,但也许值得进一步研究。
Not sure if this consideration helps, but what about acknowledgement modes? Could it be possible that the message has been delivered already (in auto acknowledge mode) but was not yet properly processed by the consuming service endpoint?
No idea how to configure explicit acknowledgement in this scenario, but maybe worth to investigate further.