如何终止“ActiveMQ Transport”中断 JMS MessageConsumer.receive() 后的线程

发布于 2024-11-24 04:59:06 字数 2807 浏览 1 评论 0原文

我有一个使用 ActiveMQConnectionFactory.createConnection() 获取 ActiveMQ 连接的类。 然后,它创建并拥有会话、目的地以及侦听该目的地的消费者(队列)。

我在消费者上调用 receive() 或 receive(millis) 来将消息从队列中取出。在某些情况下,我必须杀死(中断)正在调用接收方法的线程。之后我尝试立即关闭会话和连接。不幸的是,我在调用 close 时不断遇到异常,并且关联的“ActiveMQ Transport”线程(以及与代理的相关连接)保持活动状态。我得到的例外是

org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection
at
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253)
at
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94)
at org.myorg.myservice.job.Job.run(Job.java:206)
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio)
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799)
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636)
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627)
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232)
... 2 more
Caused by: java.io.InterruptedIOException
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
... 7 more

我尝试了以下连接 URL 故障转移://tcp://broker.ip.address:61616 tcp://broker.ip.地址:61616 failover://tcp://broker.ip.address:61616?trace=true&closeAsync=false

我也尝试使用 MessageConsumer::receiveNoWait() 调用并执行我自己的 Thread.sleep,但我总是结束每当我调用以下命令来关闭连接时,都会出现上述异常

try {
    // I added the following two lines to see if the taskRunner was handling the threads - still no luck
    TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner();
    taskRunner.shutdown();

    if (producer != null) {
        producer.close();
    }
    if (consumer != null) {
        consumer.close();
    }
    session.close();
    if (connection instanceof ActiveMQConnection) {
        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
        amqConnection.stop();
    }           
    connection.stop();
    connection.close();
}
catch (ConnectionClosedException e) {
    // NOOP - this is ok
}
catch (Exception e) {
    throw new MessagingException("Failed to close JMS connection", e, log);
}

I have a class that acquires an ActiveMQ connection using ActiveMQConnectionFactory.createConnection().
It then creates and owns the session, the destination and the consumer (queue) listening on that destination.

I call receive() or receive(millis) on the consumer to get my messages off the queue. In certain scenarios I have to kill (interrupt) the thread in which the receive method is being called. I try to close the session and the connection right after that. Unfortunately I am constantly getting an exception when calling close and the associated "ActiveMQ Transport" threads (and related connections to the broker) are staying alive. The exception I get is

org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection
at
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253)
at
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94)
at org.myorg.myservice.job.Job.run(Job.java:206)
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio)
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799)
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636)
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627)
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232)
... 2 more
Caused by: java.io.InterruptedIOException
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
... 7 more

I've tried the following connection URLs
failover://tcp://broker.ip.address:61616
tcp://broker.ip.address:61616
failover://tcp://broker.ip.address:61616?trace=true&closeAsync=false

Also I've tried using the MessageConsumer::receiveNoWait() call and just executing my own Thread.sleep, but I always end up with the exception above whenever I call the following to close the connection

try {
    // I added the following two lines to see if the taskRunner was handling the threads - still no luck
    TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner();
    taskRunner.shutdown();

    if (producer != null) {
        producer.close();
    }
    if (consumer != null) {
        consumer.close();
    }
    session.close();
    if (connection instanceof ActiveMQConnection) {
        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
        amqConnection.stop();
    }           
    connection.stop();
    connection.close();
}
catch (ConnectionClosedException e) {
    // NOOP - this is ok
}
catch (Exception e) {
    throw new MessagingException("Failed to close JMS connection", e, log);
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

大海や 2024-12-01 04:59:06

我不确定为什么关闭对你不起作用。但是您可以通过使用异步消息侦听器来完成您想要的任务。

Connection connection = connectionFactory.createConnection();
connection.start();

Session session = 
    connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination queue = session.createQueue("queueName");

Consumer consumer = session.createConsumer( queue );

consumer.setMessageListener( new MessageListener()
{
    public void onMessage( Message message )
    { 
         // whatever was after the receive() method call
    }
} );

那么就没有必要被打扰了。完成后,关闭所有内容:

consumer.close();
session.close();
queue.close();
connection.close();

I'm not sure why the close is not working for you. But you can accomplish what you want by using an asynchronous message listener.

Connection connection = connectionFactory.createConnection();
connection.start();

Session session = 
    connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination queue = session.createQueue("queueName");

Consumer consumer = session.createConsumer( queue );

consumer.setMessageListener( new MessageListener()
{
    public void onMessage( Message message )
    { 
         // whatever was after the receive() method call
    }
} );

Then there's no need to be interrupted. When you are done, close things out:

consumer.close();
session.close();
queue.close();
connection.close();
极致的悲 2024-12-01 04:59:06

一种解决方案是在 TCP ActiveMQ 传输线程上设置守护进程标志,
例如 tcp://URI:PORT?daemon=true

查看 ActiveMQ 文档:http://activemq.apache.org/tcp-transport-reference.html

One solution could be to set daemon flag on TCP ActiveMQ transport thread,
e.g. tcp://URI:PORT?daemon=true

Check out ActiveMQ documentation: http://activemq.apache.org/tcp-transport-reference.html

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文