如何使用 py-amqplib 等待多个队列上的消息

发布于 2024-08-13 03:15:03 字数 1631 浏览 5 评论 0原文

我正在使用 py-amqplib 在 Python 中访问 RabbitMQ。应用程序不时收到侦听某些 MQ 主题的请求。

第一次收到这样的请求时,它会创建一个 AMQP 连接和一个通道,并启动一个新线程来侦听消息:

    connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
    channel = connection.channel()

    listener = AMQPListener(channel)
    listener.start()

AMQPListener 非常简单:

class AMQPListener(threading.Thread):
    def __init__(self, channel):
        threading.Thread.__init__(self)
        self.__channel = channel

    def run(self):
        while True:
            self.__channel.wait()

创建连接后,它会订阅感兴趣的主题,像这样:

channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)

def receive_callback(msg):
    self.queue.put(msg.body)

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)

第一次一切正常。但是,后续订阅另一个主题的请求失败。在后续请求中,我重新使用 AMQP 连接和 AMQPListener 线程(因为我不想为每个主题启动一个新线程),并且当我调用 channel.queue_declare() 上方的代码块时方法调用永远不会返回。我还尝试在此时创建一​​个新通道,并且 connection.channel() 调用也永远不会返回。

我能够让它工作的唯一方法是为每个主题创建一个新的连接、通道和侦听器线程(即routing_key),但这确实不理想。我怀疑 wait() 方法以某种方式阻止了整个连接,但我不确定该怎么办。当然,我应该能够使用单个侦听器线程接收具有多个路由键(甚至在多个通道上)的消息?

一个相关的问题是:当该主题不再感兴趣时,如何停止侦听器线程?如果没有消息,channel.wait() 调用似乎会永远阻塞。我能想到的唯一方法是向队列发送一条虚拟消息,这会“毒害”它,即。被听者理解为停止的信号。

I'm using py-amqplib to access RabbitMQ in Python. The application receives requests to listen on certain MQ topics from time to time.

The first time it receives such a request it creates an AMQP connection and a channel and starts a new thread to listen for messages:

    connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
    channel = connection.channel()

    listener = AMQPListener(channel)
    listener.start()

AMQPListener is very simple:

class AMQPListener(threading.Thread):
    def __init__(self, channel):
        threading.Thread.__init__(self)
        self.__channel = channel

    def run(self):
        while True:
            self.__channel.wait()

After creating the connection it subscribes to the topic of interest, like this:

channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)

def receive_callback(msg):
    self.queue.put(msg.body)

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)

The first time this all works fine. However, it fails on a subsequent request to subscribe to another topic. On subsequent requests I re-use the AMQP connection and AMQPListener thread (since I don't want to start a new thread for each topic) and when I call the code block above the channel.queue_declare() method call never returns. I've also tried creating a new channel at that point and the connection.channel() call never returns, either.

The only way I've been able to get it to work is to create a new connection, channel and listener thread per topic (ie. routing_key), but this is really not ideal. I suspect it's the wait() method that's somehow blocking the entire connection, but I'm not sure what to do about it. Surely I should be able to receive messages with several routing keys (or even on several channels) using a single listener thread?

A related question is: how do I stop the listener thread when that topic is no longer of interest? The channel.wait() call appears to block forever if there are no messages. The only way I can think of is to send a dummy message to the queue that would "poison" it, ie. be interpreted by the listener as a signal to stop.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

执着的年纪 2024-08-20 03:15:03

如果您希望每个频道有多个消费者,只需使用 basic_consume() 附加另一个消费者,然后使用 channel.wait() 即可。它将侦听通过 basic_consume() 连接的所有队列。确保为每个 basic_consume() 定义不同的消费者标签。

如果您想取消队列上的特定使用者(取消监听特定主题),请使用channel.basic_cancel(consumer_tag)

If you want more than one comsumer per channel just attach another one using basic_consume() and use channel.wait() after. It will listen to all queues attached via basic_consume(). Make sure you define different consumer tags for each basic_consume().

Use channel.basic_cancel(consumer_tag) if you want to cancel a specific consumer on a queue (cancelling listen to a specific topic).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文