如果队列为空 RabbitMQ 如何停止消费者?
所以我是 RabbitMQ 的新手,我已经实现了一个简单的生产者-消费者,对于我的用例,如果队列为空,我需要停止消费者,但我找不到任何解决方案。 发件人:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
收件人:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
so im new to RabbitMQ, i have implemented a simple producer-consumer and for my use case i need to stop the consumer if the queue is empty but i can't find any solution.
sender:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
reciver:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您可以获取队列中消息的数量,如果等于 0 则退出循环。
You can get the count of messages in queue, then exit the loop if its equal to 0.
或者,您可以使用超时功能。例如,如果经过一定时间并且您的消费者闲置,您可以杀死您的工作人员/进程/程序。
如果消费者闲置 5 秒,上述程序就会终止。
另请注意,我的
on_message_callback
封装了pika
所需的常规回调。这本质上是将计时器的实例传递给 pika 的回调以启动/停止计时器。Alternatively, you could use a timeout functionality. As in, if after certain time passes by and your consumer is sitting idle, you can kill your worker/process/program.
The above program will die if the consumer sits idle for 5 seconds.
Also note that my
on_message_callback
is encapsulating a regular callback that's expected bypika
. That is essentially passing an instance of timer topika
's callback in order to start/stop the timer.