如何使用 Pika 发送和接收 RabbitMQ 消息?

发布于 2024-10-27 16:37:30 字数 435 浏览 10 评论 0原文

我在让 Pika 以与 AMQP 或 RabbitMQ 文档一致的方式使用路由密钥或交换时遇到一些问题。我知道 RabbitMQ 文档使用旧版本的 Pika,所以我忽略了他们的示例代码。

我想做的是定义一个队列“订单”并有两个消费者,一个处理交换或routing_key“生产”,另一个处理“测试”。从 RabbitMQ 文档来看,通过使用直接交换和路由键或使用主题交换应该很容易做到。

然而,Pika 似乎不知道如何处理交换和路由密钥。使用 RabbitMQ 管理工具检查队列,很明显 Pika 没有正确对消息进行排队,或者 RabbitMQ 只是将其丢弃。

在消费者方面,我不太清楚应该如何将消费者绑定到交换或处理路由密钥,并且文档并没有真正的帮助。

如果我放弃所有想法或交换和路由密钥,消息会很好地排队并且很容易被我的消费者处理。

人们拥有的任何指针或示例代码都会很好。

I'm having some issue getting Pika to work with routing keys or exchanges in a way that's consistent with it AMQP or RabbitMQ documentation. I understand that the RabbitMQ documentation uses an older version of Pika, so I have disregarded their example code.

What I'm trying to do is define a queue, "order" and have two consumers, one that handle the exchange or routing_key "production" and one that handles "test". From looking at that RabbitMQ documentation that should be easy enough to do by using either a direct exchange and routing keys or by using a topic exchange.

Pika however doesn't appear to know what to do with the exchanges and routing keys. Using the RabbitMQ management tool to inspect the queues, it's pretty obvious that Pika either didn't queue the message correctly or that RabbitMQ just threw it away.

On the consumer side it isn't really clear how I should bind a consumer to an exchange or handle routing keys and the documentation isn't really helping.

If I drop all ideas or exchanges and routing keys, messages queue up nicely and are easily handled by my consumer.

Any pointers or example code people have would be nice.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

好多鱼好多余 2024-11-03 16:37:30

事实证明,我对AMQP的理解并不完整。

其思想如下:

客户端

获得连接后的客户端除了交换器的名称和路由密钥之外不应该关心任何其他内容。也就是说,我们不知道这将最终进入哪个队列。

channel.basic_publish(exchange='order',
                      routing_key="order.test.customer",
                      body=pickle.dumps(data),
                      properties=pika.BasicProperties(
                          content_type="text/plain",
                          delivery_mode=2))

Consumer

当通道打开时,我们声明交换和队列

channel.exchange_declare(exchange='order', 
                         type="topic", 
                         durable=True, 
                         auto_delete=False)

channel.queue_declare(queue="test", 
                      durable=True, 
                      exclusive=False, 
                      auto_delete=False, 
                      callback=on_queue_declared)

当队列准备好时,在“on_queue_declared”回调中是一个很好的选择地方,我们可以使用我们想要的路由密钥将队列绑定到交换器。

channel.queue_bind(queue='test', 
                   exchange='order', 
                   routing_key='order.test.customer')

#handle_delivery is the callback that will actually pickup and handle messages
#from the "test" queue
channel.basic_consume(handle_delivery, queue='test') 

使用路由键“order.test.customer”发送到“order”交换的消息现在将被路由到“test”队列,消费者可以在其中接收它。

As it turns out, my understanding of AMQP was incomplete.

The idea is as following:

Client:

The client after getting the connection should not care about anything else but the name of the exchange and the routing key. That is we don't know which queue this will end up in.

channel.basic_publish(exchange='order',
                      routing_key="order.test.customer",
                      body=pickle.dumps(data),
                      properties=pika.BasicProperties(
                          content_type="text/plain",
                          delivery_mode=2))

Consumer

When the channel is open, we declare the exchange and queue

channel.exchange_declare(exchange='order', 
                         type="topic", 
                         durable=True, 
                         auto_delete=False)

channel.queue_declare(queue="test", 
                      durable=True, 
                      exclusive=False, 
                      auto_delete=False, 
                      callback=on_queue_declared)

When the queue is ready, in the "on_queue_declared" callback is a good place, we can bind the queue to the exchange, using our desired routing key.

channel.queue_bind(queue='test', 
                   exchange='order', 
                   routing_key='order.test.customer')

#handle_delivery is the callback that will actually pickup and handle messages
#from the "test" queue
channel.basic_consume(handle_delivery, queue='test') 

Messages send to the "order" exchange with the routing key "order.test.customer" will now be routed to the "test" queue, where the consumer can pick it up.

还在原地等你 2024-11-03 16:37:30

虽然西蒙的答案总体上似乎是正确的,但您可能需要交换参数来使用

channel.basic_consume(queue='test', on_message_callback=handle_delivery) 

基本设置就像

credentials = pika.PlainCredentials("some_user", "some_password")
parameters = pika.ConnectionParameters(
    "some_host.domain.tld", 5672, "some_vhost", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

开始使用:

channel.start_consuming()

While Simon's answer seems right in general, you might need to swap the parameters for consuming

channel.basic_consume(queue='test', on_message_callback=handle_delivery) 

Basic setup is sth like

credentials = pika.PlainCredentials("some_user", "some_password")
parameters = pika.ConnectionParameters(
    "some_host.domain.tld", 5672, "some_vhost", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

To start consuming:

channel.start_consuming()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文