使用RabbitMQ(Java客户端),有没有办法确定消费期间网络连接是否关闭?

发布于 2024-08-25 15:36:17 字数 1279 浏览 12 评论 0原文

我在 RHEL 5.3 上使用 Java 客户端使用 RabbitMQ。我有 2 个节点(机器)。 Node1 正在使用 Java 帮助器类 QueueingConsumer 消费 Node2 上队列中的消息。

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   ... Process message - delivery.getBody()
}

如果 Node1 或 Node2 上的接口被关闭(例如 ifconfig eth1 down),客户端(上面)永远不会知道网络不再存在。 RabbitMQ 是否在 Java 客户端上提供某种类型的配置,可用于确定连接是否已消失。关闭 Node2 上的 RabbitMQ 服务器将触发 ShutdownSignalException,可以捕获该异常,并且应用程序可以进入重新连接循环。但是关闭接口不会导致任何类型的异常发生,因此代码将永远等待consumer.nextDelivery()。

我还尝试过使用此调用的超时版本。例如,

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
int timeout_ms = 30000;
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout_ms);
   if (delivery == null)
   {
      if (channel.isOpen() == false)             // Seems to always return true
      { throw new ShutdownSignalException(); }
   }
   else
   {
     ... Process message - delivery.getBody()
   }
}

但似乎总是返回 true (即使接口已关闭)。我假设在连接上注册 ShutdownListener 会产生相同的结果,但尚未尝试过。

有没有办法配置某种心跳,或者您是否只需要编写自定义租赁逻辑(例如“我现在在这里”)才能使其正常工作?

I'm using RabbitMQ on RHEL 5.3 using the Java client. I have 2 nodes (machines). Node1 is consuming messages from a queue on Node2 using the Java helper class QueueingConsumer.

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   ... Process message - delivery.getBody()
}

If the interface is brought down on Node1 or Node2 (e.g. ifconfig eth1 down), the client (above) never knows the network isn't there anymore. Does RabbitMQ provide some type of configuration on the Java client that can be used to determine if the connection has gone away. Shutting down the RabbitMQ server on Node2 will trigger a ShutdownSignalException, which can be caught and the app can go into a reconnect loop. But bringing down the interface doesn't cause any type of exception to happen, so the code will be waiting forever on consumer.nextDelivery().

I've also tried using the timeout version of this call. e.g.

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("MyQueueOnNode2", noAck, consumer);
int timeout_ms = 30000;
while (true)
{
   QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout_ms);
   if (delivery == null)
   {
      if (channel.isOpen() == false)             // Seems to always return true
      { throw new ShutdownSignalException(); }
   }
   else
   {
     ... Process message - delivery.getBody()
   }
}

but appears that this always returns true (even though the interface is down). I assume registering for the ShutdownListener on the connection will yield the same results, but haven't tried that yet.

Is there a way to configure some sort of heartbeat, or do you just have to write custom lease logic (e.g. "I'm here now") in order to get this to work?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

秋千易 2024-09-01 15:36:17

一般来说,您最好在rabbitmq-discuss邮件列表上发布有关rabbitmq的问题。我们不倾向于跟踪在此之外提出的问题。

您可以配置心跳,但默认情况下它是关闭的。您还可以打开 TCP Keep Alive。创建新连接之前,可以在 ConnectionFactory 上调用 setRequestedHeartbeat,或者在 ConnectionFactory 子类中重写 configureSocket 方法,并调用socket.setKeepAlive(true)。两者都应该导致连接在网络中断时发出通知。

In general, you're much better off posting questions regarding rabbitmq on the rabbitmq-discuss mailing list. We don't tend to track questions being asked outside of this.

There is a heartbeat that you can configure, though it is off by default. You could also turn on TCP Keep Alive. Either call setRequestedHeartbeat on the ConnectionFactory before creating a new connection, or, subclass ConnectionFactory, override the configureSocket method, and call socket.setKeepAlive(true). Both should result in the connection noticing when the network dies.

她比我温柔 2024-09-01 15:36:17

关于 isOpen 方法,文档中有很好的描述: http://www .rabbitmq.com/api-guide.html#shutdown-atomicity

关于关闭:关闭node1或2你指的是应用程序,而不是RabbitMQ服务器本身?为什么您想在任何应用程序上知道另一个应用程序是否与消息代理断开连接?这不是消息传递的重点。

您唯一能做的就是发送带有“强制”参数的消息。这告诉 RabbitMQ 服务器您希望至少有 1 个侦听器来接收您发送的消息(无论是直接队列还是主题/扇出交换中的某个队列)。如果消息无法传递到任何队列,消息将返回到您的通道并转发到给定的 ReturnListener。

Regarding the isOpen method, that is well described in the docs: http://www.rabbitmq.com/api-guide.html#shutdown-atomicity

Regarding the shutting down: with shutting down node1 or 2 you mean the application right, not the RabbitMQ server itself? Why would you want to know on any application if another application disconnects from the message broker? That's not the point of messaging.

The only thing you can do, is send messages with a 'mandatory' parameter. That tells the RabbitMQ server you expect at least 1 listener for the message you've sent (whether that be a direct queue or some queue in a topic/fanout exchange). If the message then can not be delivered to any queue, the message will return to your channel and forwarded to given ReturnListener.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文