如何终止“ActiveMQ Transport”中断 JMS MessageConsumer.receive() 后的线程
我有一个使用 ActiveMQConnectionFactory.createConnection() 获取 ActiveMQ 连接的类。 然后,它创建并拥有会话、目的地以及侦听该目的地的消费者(队列)。
我在消费者上调用 receive() 或 receive(millis) 来将消息从队列中取出。在某些情况下,我必须杀死(中断)正在调用接收方法的线程。之后我尝试立即关闭会话和连接。不幸的是,我在调用 close 时不断遇到异常,并且关联的“ActiveMQ Transport”线程(以及与代理的相关连接)保持活动状态。我得到的例外是
org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection
at
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253)
at
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94)
at org.myorg.myservice.job.Job.run(Job.java:206)
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio)
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799)
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636)
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627)
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232)
... 2 more
Caused by: java.io.InterruptedIOException
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
... 7 more
我尝试了以下连接 URL 故障转移://tcp://broker.ip.address:61616 tcp://broker.ip.地址:61616 failover://tcp://broker.ip.address:61616?trace=true&closeAsync=false
我也尝试使用 MessageConsumer::receiveNoWait() 调用并执行我自己的 Thread.sleep,但我总是结束每当我调用以下命令来关闭连接时,都会出现上述异常
try {
// I added the following two lines to see if the taskRunner was handling the threads - still no luck
TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner();
taskRunner.shutdown();
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
session.close();
if (connection instanceof ActiveMQConnection) {
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
amqConnection.stop();
}
connection.stop();
connection.close();
}
catch (ConnectionClosedException e) {
// NOOP - this is ok
}
catch (Exception e) {
throw new MessagingException("Failed to close JMS connection", e, log);
}
I have a class that acquires an ActiveMQ connection using ActiveMQConnectionFactory.createConnection().
It then creates and owns the session, the destination and the consumer (queue) listening on that destination.
I call receive() or receive(millis) on the consumer to get my messages off the queue. In certain scenarios I have to kill (interrupt) the thread in which the receive method is being called. I try to close the session and the connection right after that. Unfortunately I am constantly getting an exception when calling close and the associated "ActiveMQ Transport" threads (and related connections to the broker) are staying alive. The exception I get is
org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection
at
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253)
at
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94)
at org.myorg.myservice.job.Job.run(Job.java:206)
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio)
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799)
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636)
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627)
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232)
... 2 more
Caused by: java.io.InterruptedIOException
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
... 7 more
I've tried the following connection URLs
failover://tcp://broker.ip.address:61616
tcp://broker.ip.address:61616
failover://tcp://broker.ip.address:61616?trace=true&closeAsync=false
Also I've tried using the MessageConsumer::receiveNoWait() call and just executing my own Thread.sleep, but I always end up with the exception above whenever I call the following to close the connection
try {
// I added the following two lines to see if the taskRunner was handling the threads - still no luck
TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner();
taskRunner.shutdown();
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
session.close();
if (connection instanceof ActiveMQConnection) {
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
amqConnection.stop();
}
connection.stop();
connection.close();
}
catch (ConnectionClosedException e) {
// NOOP - this is ok
}
catch (Exception e) {
throw new MessagingException("Failed to close JMS connection", e, log);
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我不确定为什么关闭对你不起作用。但是您可以通过使用异步消息侦听器来完成您想要的任务。
那么就没有必要被打扰了。完成后,关闭所有内容:
I'm not sure why the close is not working for you. But you can accomplish what you want by using an asynchronous message listener.
Then there's no need to be interrupted. When you are done, close things out:
一种解决方案是在 TCP ActiveMQ 传输线程上设置守护进程标志,
例如
tcp://URI:PORT?daemon=true
查看 ActiveMQ 文档:http://activemq.apache.org/tcp-transport-reference.html
One solution could be to set daemon flag on TCP ActiveMQ transport thread,
e.g.
tcp://URI:PORT?daemon=true
Check out ActiveMQ documentation: http://activemq.apache.org/tcp-transport-reference.html