RabbitMQ:setReturnListner handleBasicReturn nt 因未传递的消息而被调用

发布于 2024-10-10 11:05:13 字数 1973 浏览 4 评论 0原文

对于其中一项要求,我们需要跟踪队列深度和成功处理的消息。这个想法是发布消息并获取成功和失败消息的列表。为了模拟需求,我执行了以下操作

  1. 发布带有强制和立即标志的消息发送channel.basicPublish 'exchange','rKey',true,false,props,“Hello World”.bytes
  2. 消费者消耗甚至标记(我已经输入了数字从 1..10 作为每个消息头中的标记值)并且不ACKS奇数编号的消息。
  3. 我在发布者中实现了 setReturnListnere 以捕获未传递的消息。

虽然我能够通过 Rabbmitmqctl list_queues messages_unacknowledged 获取 unack 消息的数量,但不知何故,我的 handleBasicReturn 方法没有被调用。我错过了一些东西。

代码片段:

生产者:

channel.setReturnListener(new ReturnListener() {
    public void handleBasicReturn(int replyCode, String replyText, String exchange,
                                  String routingKey, AMQP.BasicProperties properties,
                                  byte[] body) 
            throws IOException {
        println "Debugging messages!!!!"
        println "The details of returned messages are ${replyText} from  ${exchange} with routingKey as ${routingKey} with properties"
    }
});

println " queuename is ${dec.queue} and consumerCount is ${dec.consumerCount} messageCount is ${dec.messageCount}"
(1..10).each {
    println "Sending file ${i}....."
    def headers = new HashMap<String,Object>()
    headers.put "operatiion","scp"
    headers.put "dest","joker.dk.mach.com"
    headers.put "id", i
    println headers

    BasicProperties props = new BasicProperties(null, null, headers, null, null, null, null, null,null, null, null, null,null, null)
                    channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello Worls".bytes                                
    i++                         
}
channel.close()

消费者:

while (true) {
    def delivery = consumer.nextDelivery()
    def headers = delivery?.properties?.headers
    def id = headers.get("id")
    println "Received message:"
    println " ${id.toString()}"

    if( id % 2 == 0){
        channel.basicAck delivery.envelope.deliveryTag, false                
    }    
}

For one of the requirement we need to keep track of queue depth and successfully processed messages. The idea is to publish messages and get a list of successful and failed messages. To simulate the requirement I did the following

  1. Publish the messages with Mandatory and Immediate flag sent channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello World".bytes
  2. The consumer consumes even marked ( I have put numbers from 1..10 as marked value in header of each messages) and does not ACKS odd numbered messages.
  3. I have implemented setReturnListnere in the publisher to capture undelivered messages.

While am able to get the number of unack messages via Rabbmitmqctl list_queues messages_unacknowledged, somehow my handleBasicReturn method does not gets called. Am in missing something.

Code snippets:

Producer:

channel.setReturnListener(new ReturnListener() {
    public void handleBasicReturn(int replyCode, String replyText, String exchange,
                                  String routingKey, AMQP.BasicProperties properties,
                                  byte[] body) 
            throws IOException {
        println "Debugging messages!!!!"
        println "The details of returned messages are ${replyText} from  ${exchange} with routingKey as ${routingKey} with properties"
    }
});

println " queuename is ${dec.queue} and consumerCount is ${dec.consumerCount} messageCount is ${dec.messageCount}"
(1..10).each {
    println "Sending file ${i}....."
    def headers = new HashMap<String,Object>()
    headers.put "operatiion","scp"
    headers.put "dest","joker.dk.mach.com"
    headers.put "id", i
    println headers

    BasicProperties props = new BasicProperties(null, null, headers, null, null, null, null, null,null, null, null, null,null, null)
                    channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello Worls".bytes                                
    i++                         
}
channel.close()

Consumer:

while (true) {
    def delivery = consumer.nextDelivery()
    def headers = delivery?.properties?.headers
    def id = headers.get("id")
    println "Received message:"
    println " ${id.toString()}"

    if( id % 2 == 0){
        channel.basicAck delivery.envelope.deliveryTag, false                
    }    
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

草莓味的萝莉 2024-10-17 11:05:13

阅读此说明了解立即标志和强制标志如何影响 RabbitMQ 中的消息传递。

在您的情况下,由于您有一个消费者接收消息,因此即使消费者从未确认消息,也不会返回消息。

D .

Read this explanation about how the immediate and mandatory flags impact message delivery in RabbitMQ.

In your case, since you have a consumer receiving messages, messages won't be returned even if the consumer never acks them.

D.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文