如何使用 py-amqplib 等待多个队列上的消息
我正在使用 py-amqplib 在 Python 中访问 RabbitMQ。应用程序不时收到侦听某些 MQ 主题的请求。
第一次收到这样的请求时,它会创建一个 AMQP 连接和一个通道,并启动一个新线程来侦听消息:
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener 非常简单:
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
创建连接后,它会订阅感兴趣的主题,像这样:
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
第一次一切正常。但是,后续订阅另一个主题的请求失败。在后续请求中,我重新使用 AMQP 连接和 AMQPListener 线程(因为我不想为每个主题启动一个新线程),并且当我调用 channel.queue_declare() 上方的代码块时方法调用永远不会返回。我还尝试在此时创建一个新通道,并且 connection.channel() 调用也永远不会返回。
我能够让它工作的唯一方法是为每个主题创建一个新的连接、通道和侦听器线程(即routing_key),但这确实不理想。我怀疑 wait() 方法以某种方式阻止了整个连接,但我不确定该怎么办。当然,我应该能够使用单个侦听器线程接收具有多个路由键(甚至在多个通道上)的消息?
一个相关的问题是:当该主题不再感兴趣时,如何停止侦听器线程?如果没有消息,channel.wait() 调用似乎会永远阻塞。我能想到的唯一方法是向队列发送一条虚拟消息,这会“毒害”它,即。被听者理解为停止的信号。
I'm using py-amqplib to access RabbitMQ in Python. The application receives requests to listen on certain MQ topics from time to time.
The first time it receives such a request it creates an AMQP connection and a channel and starts a new thread to listen for messages:
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener is very simple:
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
After creating the connection it subscribes to the topic of interest, like this:
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
The first time this all works fine. However, it fails on a subsequent request to subscribe to another topic. On subsequent requests I re-use the AMQP connection and AMQPListener thread (since I don't want to start a new thread for each topic) and when I call the code block above the channel.queue_declare() method call never returns. I've also tried creating a new channel at that point and the connection.channel() call never returns, either.
The only way I've been able to get it to work is to create a new connection, channel and listener thread per topic (ie. routing_key), but this is really not ideal. I suspect it's the wait() method that's somehow blocking the entire connection, but I'm not sure what to do about it. Surely I should be able to receive messages with several routing keys (or even on several channels) using a single listener thread?
A related question is: how do I stop the listener thread when that topic is no longer of interest? The channel.wait() call appears to block forever if there are no messages. The only way I can think of is to send a dummy message to the queue that would "poison" it, ie. be interpreted by the listener as a signal to stop.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
如果您希望每个频道有多个消费者,只需使用 basic_consume() 附加另一个消费者,然后使用 channel.wait() 即可。它将侦听通过 basic_consume() 连接的所有队列。确保为每个 basic_consume() 定义不同的消费者标签。
如果您想取消队列上的特定使用者(取消监听特定主题),请使用channel.basic_cancel(consumer_tag)。
If you want more than one comsumer per channel just attach another one using basic_consume() and use channel.wait() after. It will listen to all queues attached via basic_consume(). Make sure you define different consumer tags for each basic_consume().
Use channel.basic_cancel(consumer_tag) if you want to cancel a specific consumer on a queue (cancelling listen to a specific topic).