如果队列为空 RabbitMQ 如何停止消费者?

发布于 2025-01-13 16:54:36 字数 833 浏览 3 评论 0原文

所以我是 RabbitMQ 的新手,我已经实现了一个简单的生产者-消费者,对于我的用例,如果队列为空,我需要停止消费者,但我找不到任何解决方案。 发件人:

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

收件人:

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

so im new to RabbitMQ, i have implemented a simple producer-consumer and for my use case i need to stop the consumer if the queue is empty but i can't find any solution.
sender:

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

reciver:

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

舂唻埖巳落 2025-01-20 16:54:36

您可以获取队列中消息的数量,如果等于 0 则退出循环。

import pika
connection = pika.BlockingConnection()
channel = connection.channel()
q = channel.queue_declare(q_name)
q_len = q.method.message_count

You can get the count of messages in queue, then exit the loop if its equal to 0.

import pika
connection = pika.BlockingConnection()
channel = connection.channel()
q = channel.queue_declare(q_name)
q_len = q.method.message_count
小鸟爱天空丶 2025-01-20 16:54:36

或者,您可以使用超时功能。例如,如果经过一定时间并且您的消费者闲置,您可以杀死您的工作人员/进程/程序。

import pika
import _thread
from threading import Timer

q_name = "hello"
conn = pika.BlockingConnection()
ch = conn.channel()
ch.queue_declare(q_name)

timeout_sec = 5 # times out in 5s


def timer():
    return Timer(timeout_sec, lambda: _thread.interrupt_main())


def callback(t, ch, method, properties, body):
    t.cancel()
    print("[x] Received:", body)
    t = timer()
    t.start()


try:
    t = timer()
    t.start()
    ch.basic_consume(
        queue=q_name,
        on_message_callback=lambda *args, **kwargs: callback(t, *args, **kwargs),
        auto_ack=True,
    )
    ch.start_consuming()
except KeyboardInterrupt:
    print("Nothing left in queue exiting....")

如果消费者闲置 5 秒,上述程序就会终止。
另请注意,我的 on_message_callback 封装了 pika 所需的常规回调。这本质上是将计时器的实例传递给 pika 的回调以启动/停止计时器。

Alternatively, you could use a timeout functionality. As in, if after certain time passes by and your consumer is sitting idle, you can kill your worker/process/program.

import pika
import _thread
from threading import Timer

q_name = "hello"
conn = pika.BlockingConnection()
ch = conn.channel()
ch.queue_declare(q_name)

timeout_sec = 5 # times out in 5s


def timer():
    return Timer(timeout_sec, lambda: _thread.interrupt_main())


def callback(t, ch, method, properties, body):
    t.cancel()
    print("[x] Received:", body)
    t = timer()
    t.start()


try:
    t = timer()
    t.start()
    ch.basic_consume(
        queue=q_name,
        on_message_callback=lambda *args, **kwargs: callback(t, *args, **kwargs),
        auto_ack=True,
    )
    ch.start_consuming()
except KeyboardInterrupt:
    print("Nothing left in queue exiting....")

The above program will die if the consumer sits idle for 5 seconds.
Also note that my on_message_callback is encapsulating a regular callback that's expected by pika. That is essentially passing an instance of timer to pika's callback in order to start/stop the timer.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文