QPID - Spring CachingConnectionFactory - 重新连接

发布于 2024-12-05 23:44:10 字数 8839 浏览 3 评论 0原文

Spring Configuration

 <bean id="jmsQueueConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
    <constructor-arg index="0"
        value="amqp://guest:guest@localhost/test?brokerlist='tcp://localhost:5672'" />
</bean>

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsQueueConnectionFactory" />
    <property name="sessionCacheSize" value="1" />
    <property name="reconnectOnException" value="true" />
</bean>

<bean id="myDestination" class="org.apache.qpid.client.AMQAnyDestination">
    <constructor-arg index="0" value="ADDR:myqueue; {create: always}" />
</bean>

<bean id="myServiceBean" class="com.test.MyService" />

<bean id="myContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="exceptionListener" ref="cachingConnectionFactory" /> 
    <property name="messageListener" ref="myServiceBean" />
    <property name="concurrentConsumers" value="1" />
    <property name="autoStartup" value="true" />
    <property name="destination" ref="myDestination" />
    <property name="recoveryInterval" value="10000" />      
</bean>

MyService.java

public class MyService implements MessageListener {
    public void onMessage(Message msg) {        
    log.info("----On Message called :"+msg+", :"+msg.getClass().getName());         
    }
}

当我重新启动 QPID

没有 reconnectOnException=true 时,我不断收到此异常,但没有重新连接

3203 [myContainer-1] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS Session for mode 1: org.apache.qpid.client.AMQSession_0_10@1d03a4e
3312 [myContainer-1] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS MessageConsumer for destination ['myqueue'/None; {
  'create': 'always'
}]: org.apache.qpid.client.BasicMessageConsumer_0_10@8a2023
99109 [myContainer-1] WARN org.springframework.jms.listener.DefaultMessageListenerContainer  - Setup of JMS message listener invoker failed for destination ''myqueue'/None; {
  'create': 'always'
}' - trying to recover. Cause: timed out waiting for session to become open (state=DETACHED)
org.apache.qpid.transport.SessionException: timed out waiting for session to become open (state=DETACHED)
    at org.apache.qpid.transport.Session.invoke(Session.java:630)
    at org.apache.qpid.transport.Session.invoke(Session.java:559)
    at org.apache.qpid.transport.SessionInvoker.executionSync(SessionInvoker.java:84)
    at org.apache.qpid.transport.Session.sync(Session.java:782)
    at org.apache.qpid.transport.Session.sync(Session.java:770)
    at org.apache.qpid.client.BasicMessageConsumer_0_10.getMessageFromQueue(BasicMessageConsumer_0_10.java:423)
    at org.apache.qpid.client.BasicMessageConsumer.receive(BasicMessageConsumer.java:407)
    at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:74)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
    at java.lang.Thread.run(Thread.java:619)
99109 [myContainer-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection

..

281125 [myContainer-3] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection


使用 reconnectOnException=true ,它会连接和断开连接

    13015 [IoReceiver - localhost/127.0.0.1:5672] WARN org.springframework.jms.connection.CachingConnectionFactory  - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: connection aborted
    at org.apache.qpid.client.AMQConnectionDelegate_0_10.closed(AMQConnectionDelegate_0_10.java:303)
    at org.apache.qpid.transport.Connection.closed(Connection.java:568)
    at org.apache.qpid.transport.network.Assembler.closed(Assembler.java:110)
    at org.apache.qpid.transport.network.InputHandler.closed(InputHandler.java:202)
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:150)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.transport.ConnectionException: connection aborted
    at org.apache.qpid.transport.Connection.closed(Connection.java:541)
    ... 4 more
13031 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Closing shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672
Virtual Host: test
Client ID: localhost
Active session count: 1
73031 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Could not close shared JMS Connection
org.apache.qpid.client.JMSAMQException: timed out waiting for session to become open (state=DETACHED)
    at org.apache.qpid.client.AMQConnection.stop(AMQConnection.java:824)
    at org.springframework.jms.connection.SingleConnectionFactory.closeConnection(SingleConnectionFactory.java:422)
    at org.springframework.jms.connection.SingleConnectionFactory.resetConnection(SingleConnectionFactory.java:321)
    at org.springframework.jms.connection.CachingConnectionFactory.resetConnection(CachingConnectionFactory.java:197)
    at org.springframework.jms.connection.SingleConnectionFactory.onException(SingleConnectionFactory.java:302)
    at org.springframework.jms.connection.ChainedExceptionListener.onException(ChainedExceptionListener.java:57)
    at org.apache.qpid.client.AMQConnectionDelegate_0_10.closed(AMQConnectionDelegate_0_10.java:306)
    ...
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:150)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.AMQException: timed out waiting for session to become open (state=DETACHED) [error code 541: internal error]
    at org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1050)
    at org.apache.qpid.client.AMQSession_0_10.sync(AMQSession_0_10.java:1030)
    at org.apache.qpid.client.AMQSession_0_10.sendSuspendChannel(AMQSession_0_10.java:857)
    at org.apache.qpid.client.AMQSession.suspendChannel(AMQSession.java:3006)
    at org.apache.qpid.client.AMQSession.stop(AMQSession.java:2341)
    at org.apache.qpid.client.AMQConnection.stop(AMQConnection.java:820)
    ... 11 more

104875 [myContainer-1] INFO org.springframework.jms.connection.CachingConnectionFactory  - Established shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672
Virtual Host: test
Client ID: localhost
Active session count: 0
104875 [myContainer-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection
104875 [myContainer-2] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS Session for mode 1: org.apache.qpid.client.AMQSession_0_10@191e4c
104937 [IoReceiver - localhost/127.0.0.1:5672] WARN org.springframework.jms.connection.CachingConnectionFactory  - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: 404
    at org.apache.qpid.client.AMQConnection.exceptionReceived(AMQConnection.java:1230)

    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.AMQException: ch=0 id=0 ExecutionException(errorCode=NOT_FOUND, commandId=0, description=Queue: myqueue not found) [error code 404: not found]
    at org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1050)
    ... 29 more
104937 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Closing shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672


Spring Configuration

 <bean id="jmsQueueConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
    <constructor-arg index="0"
        value="amqp://guest:guest@localhost/test?brokerlist='tcp://localhost:5672'" />
</bean>

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsQueueConnectionFactory" />
    <property name="sessionCacheSize" value="1" />
    <property name="reconnectOnException" value="true" />
</bean>

<bean id="myDestination" class="org.apache.qpid.client.AMQAnyDestination">
    <constructor-arg index="0" value="ADDR:myqueue; {create: always}" />
</bean>

<bean id="myServiceBean" class="com.test.MyService" />

<bean id="myContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="exceptionListener" ref="cachingConnectionFactory" /> 
    <property name="messageListener" ref="myServiceBean" />
    <property name="concurrentConsumers" value="1" />
    <property name="autoStartup" value="true" />
    <property name="destination" ref="myDestination" />
    <property name="recoveryInterval" value="10000" />      
</bean>

MyService.java

public class MyService implements MessageListener {
    public void onMessage(Message msg) {        
    log.info("----On Message called :"+msg+", :"+msg.getClass().getName());         
    }
}

When I restart QPID

Without reconnectOnException=true,I keep getting this exception ,but not reconneting

3203 [myContainer-1] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS Session for mode 1: org.apache.qpid.client.AMQSession_0_10@1d03a4e
3312 [myContainer-1] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS MessageConsumer for destination ['myqueue'/None; {
  'create': 'always'
}]: org.apache.qpid.client.BasicMessageConsumer_0_10@8a2023
99109 [myContainer-1] WARN org.springframework.jms.listener.DefaultMessageListenerContainer  - Setup of JMS message listener invoker failed for destination ''myqueue'/None; {
  'create': 'always'
}' - trying to recover. Cause: timed out waiting for session to become open (state=DETACHED)
org.apache.qpid.transport.SessionException: timed out waiting for session to become open (state=DETACHED)
    at org.apache.qpid.transport.Session.invoke(Session.java:630)
    at org.apache.qpid.transport.Session.invoke(Session.java:559)
    at org.apache.qpid.transport.SessionInvoker.executionSync(SessionInvoker.java:84)
    at org.apache.qpid.transport.Session.sync(Session.java:782)
    at org.apache.qpid.transport.Session.sync(Session.java:770)
    at org.apache.qpid.client.BasicMessageConsumer_0_10.getMessageFromQueue(BasicMessageConsumer_0_10.java:423)
    at org.apache.qpid.client.BasicMessageConsumer.receive(BasicMessageConsumer.java:407)
    at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:74)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
    at java.lang.Thread.run(Thread.java:619)
99109 [myContainer-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection

..

281125 [myContainer-3] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection


With reconnectOnException=true ,Its getting connected and disconnected

    13015 [IoReceiver - localhost/127.0.0.1:5672] WARN org.springframework.jms.connection.CachingConnectionFactory  - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: connection aborted
    at org.apache.qpid.client.AMQConnectionDelegate_0_10.closed(AMQConnectionDelegate_0_10.java:303)
    at org.apache.qpid.transport.Connection.closed(Connection.java:568)
    at org.apache.qpid.transport.network.Assembler.closed(Assembler.java:110)
    at org.apache.qpid.transport.network.InputHandler.closed(InputHandler.java:202)
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:150)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.transport.ConnectionException: connection aborted
    at org.apache.qpid.transport.Connection.closed(Connection.java:541)
    ... 4 more
13031 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Closing shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672
Virtual Host: test
Client ID: localhost
Active session count: 1
73031 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Could not close shared JMS Connection
org.apache.qpid.client.JMSAMQException: timed out waiting for session to become open (state=DETACHED)
    at org.apache.qpid.client.AMQConnection.stop(AMQConnection.java:824)
    at org.springframework.jms.connection.SingleConnectionFactory.closeConnection(SingleConnectionFactory.java:422)
    at org.springframework.jms.connection.SingleConnectionFactory.resetConnection(SingleConnectionFactory.java:321)
    at org.springframework.jms.connection.CachingConnectionFactory.resetConnection(CachingConnectionFactory.java:197)
    at org.springframework.jms.connection.SingleConnectionFactory.onException(SingleConnectionFactory.java:302)
    at org.springframework.jms.connection.ChainedExceptionListener.onException(ChainedExceptionListener.java:57)
    at org.apache.qpid.client.AMQConnectionDelegate_0_10.closed(AMQConnectionDelegate_0_10.java:306)
    ...
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:150)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.AMQException: timed out waiting for session to become open (state=DETACHED) [error code 541: internal error]
    at org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1050)
    at org.apache.qpid.client.AMQSession_0_10.sync(AMQSession_0_10.java:1030)
    at org.apache.qpid.client.AMQSession_0_10.sendSuspendChannel(AMQSession_0_10.java:857)
    at org.apache.qpid.client.AMQSession.suspendChannel(AMQSession.java:3006)
    at org.apache.qpid.client.AMQSession.stop(AMQSession.java:2341)
    at org.apache.qpid.client.AMQConnection.stop(AMQConnection.java:820)
    ... 11 more

104875 [myContainer-1] INFO org.springframework.jms.connection.CachingConnectionFactory  - Established shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672
Virtual Host: test
Client ID: localhost
Active session count: 0
104875 [myContainer-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection
104875 [myContainer-2] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS Session for mode 1: org.apache.qpid.client.AMQSession_0_10@191e4c
104937 [IoReceiver - localhost/127.0.0.1:5672] WARN org.springframework.jms.connection.CachingConnectionFactory  - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: 404
    at org.apache.qpid.client.AMQConnection.exceptionReceived(AMQConnection.java:1230)

    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.AMQException: ch=0 id=0 ExecutionException(errorCode=NOT_FOUND, commandId=0, description=Queue: myqueue not found) [error code 404: not found]
    at org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1050)
    ... 29 more
104937 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Closing shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672


如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

最美的太阳 2024-12-12 23:44:10

您是否尝试过忽略 DMLC 上的异常监听器设置?我不记得曾经需要指定这一点,而且在启动连接恢复后,您的连接似乎已失效。

另外,假设您没有使用 JBoss 4,您可以尝试使用 DMLC 的内置缓存机制(通过 cacheLevel 设置),而不是为您的消费者使用缓存连接工厂;您甚至可以获得更好的性能,因为 DMLC 可以缓存会话和消费者以及连接。

Have you tried omitting the exceptionListener setting on the DMLC? I don't recall ever needing to specify that, and it seems like your connection is being invalidated after connection recovery has been initiated.

Also, assuming you're not using JBoss 4, you might try using the DMLC's built-in caching mechanism (via the cacheLevel setting) instead of using a caching connection factory for your consumers; you may even get better performance, since DMLC can cache sessions and consumers as well as connections.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文