RabbitMQ basic.get 和确认

发布于 2024-12-11 14:26:18 字数 310 浏览 0 评论 0原文

我正在调用:

GetResponse response = channel.basicGet("some.queue", false); // no auto-ack
....
channel.basicAck(deliveryTag, ...);

但是,当我调用 basicGet 时,队列中的消息保持“就绪”状态,而不是“未确认”状态。我希望它们处于未确认状态,以便我可以 basic.ack 它们(从而从队列中丢弃它们),或者 basic.nack 它们

I'm invoking:

GetResponse response = channel.basicGet("some.queue", false); // no auto-ack
....
channel.basicAck(deliveryTag, ...);

However, when I invoke basicGet, the messages in the queue stay in "Ready", rather than in "Unacknowledged". I want them to be in unacknowledged, so that I can either basic.ack them (thus discarding them from the queue), or basic.nack them

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

苏辞 2024-12-18 14:26:18

我正在执行以下操作来模仿延迟确认

在消费时从

  1. 初始队列获取(消费)消息。
  2. 创建一个“PendingAck_123456”队列。
    123456 是消息的唯一 ID。
    设置以下属性

    • x-message-ttl(在之后重新排队
      超时)
    • x-expires(以确保临时队列将被删除)
    • x-dead-letter-exchangex-deal-letter-routing-key重新排队
      TTL 到期时的初始队列
  3. 将消息 Pending ack 发布到此“PendingAck_123456”队列 确认
  4. 消息以将其从初始队列中删除

在确认时

  1. 根据消息 Id 计算队列名称并从“PendingAck_123456”队列获取
  2. 确认它(无需调用 .getBody () ).
    这会将其从待处理队列中删除,从而防止 TTL 将其重新排队

。备注

  • 一个队列仅容纳 1 条消息。如果有很多这样的队列,这会是一个问题吗?
  • 重新排队的消息将在队列输入端发送。而不是在队列输出端发送(就像真正的确认一样)。这对消息顺序有影响。
  • 消息由应用程序复制到待处理队列。这是可能会影响整体性能的附加步骤。
  • 要模拟 Nack/Reject,您可能需要将消息复制到初始队列,然后从 PendingAck 队列中确认它。默认情况下,TTL 会执行此操作(稍后)。

I'm doing the following to mimic Delaying the ack:

At consumption time

  1. Get(consume) the message form the initial Queue.
  2. Create a "PendingAck_123456" Queue.
    123456 is a unique id of the message.
    Set the following properties

    • x-message-ttl (to requeue after
      timeout)
    • x-expires (to make sure the temp queue will be deleted)
    • x-dead-letter-exchange and x-deal-letter-routing-key to requeue to
      the initial Queue upon TTL expiration
      .
  3. Publish the message Pending ack to this "PendingAck_123456" Queue
  4. Ack the message to delete it from the initial queue

At Acknowledge time

  1. Calculate Queue Name from Message Id and Get from the "PendingAck_123456" Queue
  2. Acknowledge it (no need to call .getBody() ).
    That'll delete it from this pending queue, preventing the TTL to requeue it

Remarks

  • A Queue for only 1 message.. Is that an issue if there are a lot of such Queues ?
  • A requeued message will be sent at the queue input side.. not at the queue output (as would do a real ack).. There is an impact on the messages order.
  • Message is copied by the application to the Pending Queue.. This is an additional step that may have impacts on the overall performance.
  • To mimic a Nack/Reject, you you may want to Copy the message to the Initial Queue, and Ack it from the PendingAck queue. By default, the TTL would do it (later).
蓝颜夕 2024-12-18 14:26:18

get 之后立即执行 ack 时,效果很好。然而,就我而言,他们是因一个请求而分开的。 spring 的模板会在每次执行时关闭通道和连接。因此,存在三种选择:

  • 在应用程序的整个生命周期中保持一个通道和连接打开;
  • 具有某种对话范围(或最坏情况:使用会话)来存储相同的通道并重用它。
  • 每个请求使用一个通道,立即确认收到,并将消息存储在内存中。

在前两种情况下,你不能使用 spring 的 RabbitTemplate 来做到这一点

When doing ack immediately after the get it works fine. However, in my case, they were separated by a request. And spring's template closes the channel and connection on each execution. So there are three options:

  • keep one channel and connection open throughout the whole lifetime of the application
  • have some kind of conversation-scope (or worst-case: use the session) to store the same channel and reuse it.
  • use one channel per request, acknowledge receipt immediately, and store the messages in memory.

In the former two cases you can't do it with spring's RabbitTemplate

提笔落墨 2024-12-18 14:26:18
GetResponse resp = channel.basicGet(queueName, false);
Envelope envelope = resp.getEnvelope();
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deveryTag,false)
GetResponse resp = channel.basicGet(queueName, false);
Envelope envelope = resp.getEnvelope();
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deveryTag,false)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文