Python Kombu 消费者未收到rabbitmq消息通知(queue.get确实有效)

发布于 2024-10-20 18:53:24 字数 1704 浏览 6 评论 0原文

如果我运行以下代码,回调(测试)将传递到 消费者永远不会被触发。

但是,如果我密切关注rabbitmq GUI,我确实会看到消息已被检索(但未确认)。所以看起来消费者正在收到消息,但没有将其传递给我的回调。如果我将 no_ack 设置为 true,消息就会从队列中消失,同样不会调用回调。

hn = "..."
usr = "..."
pwd = "..."
vh = "/"
port = 5672
rkey = "some.routing.key"
qname = "some-queue-name"
exchangeName = "MyExchange"

connection = BrokerConnection(hostname=hn,
                              userid=usr,
                              password=pwd,
                              virtual_host=vh,
                              port=port)

connection.connect()
ch = connection.channel()

# Create & the exchange
exchange = Exchange(name=exchangeName,
              type="topic",
              channel=ch,
              durable=True)

exchange.declare()

# Temporary channel
ch = connection.channel()

# Create the queue to feed from
balq = Queue(name=qname,
              exchange=exchange,
              durable=True,
              auto_delete=False,
              channel=ch,
              routing_key=rkey)        

# Declare it on the server
balq.declare();

def test(b,m):
    print '** Message Arrived **'

# Create a consumer
consumer = Consumer(channel=connection.channel(),
                    queues=balq,
                    auto_declare=False,
                    callbacks = [test]
                    )

# register it on the server
consumer.consume(no_ack=False);

print 'Waiting for messages'
while(True):
    pass

但是,以下代码确实可以正常工作(我可以成功获取并确认该消息):

m = balq.get(no_ack=False)
m.ack()
print m

但重点是保持异步。所以我的回调一定有问题..

If I run the following code, the callback (test) passed to the consumer is never triggered.

However, if I keep an eye on the rabbitmq GUI, I do see that the message is retrieved (but not acknowledged). So it seems the consumer is getting the message, but not passing it on to my callback. If I set no_ack to true, the message just disappears from the queue, again without calling the callback.

hn = "..."
usr = "..."
pwd = "..."
vh = "/"
port = 5672
rkey = "some.routing.key"
qname = "some-queue-name"
exchangeName = "MyExchange"

connection = BrokerConnection(hostname=hn,
                              userid=usr,
                              password=pwd,
                              virtual_host=vh,
                              port=port)

connection.connect()
ch = connection.channel()

# Create & the exchange
exchange = Exchange(name=exchangeName,
              type="topic",
              channel=ch,
              durable=True)

exchange.declare()

# Temporary channel
ch = connection.channel()

# Create the queue to feed from
balq = Queue(name=qname,
              exchange=exchange,
              durable=True,
              auto_delete=False,
              channel=ch,
              routing_key=rkey)        

# Declare it on the server
balq.declare();

def test(b,m):
    print '** Message Arrived **'

# Create a consumer
consumer = Consumer(channel=connection.channel(),
                    queues=balq,
                    auto_declare=False,
                    callbacks = [test]
                    )

# register it on the server
consumer.consume(no_ack=False);

print 'Waiting for messages'
while(True):
    pass

However, the following code does work properly (I can successfully get and acknowledge the message):

m = balq.get(no_ack=False)
m.ack()
print m

But the whole point was to stay asynchronous. So something must be wrong with my callback..

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

睫毛溺水了 2024-10-27 18:53:24

事实证明这是一个简单的错误。添加

connection.drain_events()

到 while 循环会导致消息到达。

Turns out its a simple error. Adding

connection.drain_events()

to the while loop causes messages to arrive.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文