如何在AMQP的python客户端中使用listen on basic.return

发布于 2024-08-19 23:14:48 字数 378 浏览 8 评论 0原文

我想确保我的消息已传递到队列中。

为此,我将强制参数添加到 basic_publish 中。 如果我的消息未成功发送,我还应该做什么才能收到 basic.return 消息?

我无法使用 channel.wait() 监听 basic.return,因为当我的消息成功传递时,wait() 函数永远挂起。 (没有超时) 另一方面。当我不调用 channel.wait() 时,channel.returned_messages 将保持为空,即使消息未传递。

我使用 py-amqplib 版本 0.6。

欢迎任何解决方案。

I'd like to make sure that my message was delivered to a queue.

To do so I'm adding the mandatory param to the basic_publish.
What else should I do to receive the basic.return message if my message wasn't successfully delivered?

I can't use channel.wait() to listen for the basic.return because when my message is successfully delivered the wait() function hangs forever. (There is no timeout)
On the other hand. When I don't call channel.wait() the channel.returned_messages will remain empty, even if the message isn't delivered.

I use py-amqplib version 0.6.

Any solution is welcome.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

花开柳相依 2024-08-26 23:14:48

目前这是不可能的,因为当消息在代理中删除时,basic.return 是异步发送的。消息发送成功后,服务器不会报告任何数据。
所以 pyAMQP 无法监听此类消息。

我读过一些关于这个问题的帖子。可能的解决方案是:

  • 使用 txAMQP,处理 basic.return 的 amqp 的扭曲版本,
  • 使用 pyAMQP 和超时等待。 (我不确定当前是否可能)
  • 使用同步命令频繁 ping 服务器,以便 pyAMQP 能够在消息到达时选择 basic.return 消息。

由于对 pyAMQP 和rabbitMQ 的支持水平总体来说相当低,因此我们决定根本不使用 amqp 代理。

It is currently impossible as the basic.return is sent asynchronously when a message is dropped in broker. When message was sent successfully no data is reported from server.
So pyAMQP can't listen for such messages.

I've read few threads about this problem. Possible solution were:

  • use txAMQP, twisted version of amqp that handles basic.return
  • use pyAMQP with wait with timeout. (I'm not sure if that is currently possible)
  • ping server frequently with synchronous commands so that pyAMQP will able to pick basic.return messages when they arrive.

Because the level of support for pyAMQP and rabbitMQ in general is quite low, we decided not to use amqp broker at all.

氛圍 2024-08-26 23:14:48

您是否尝试过唯一完整的 Python AMQP 库?它没有被广泛使用,因为它的包装不整齐。

步骤 1. 编译 C 库 - 您可能需要 sudo apt-get install autotools-dev autoconf automake libtool

mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install

步骤 2. 安装 Python 库

pip install pylibrabbitmq

Have you tried the only Python AMQP library that is complete? It isn't as widely used because it is not neatly packaged.

Step 1. compile the C library - you may need sudo apt-get install autotools-dev autoconf automake libtool

mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install

Step 2. Install the Python library

pip install pylibrabbitmq
堇色安年 2024-08-26 23:14:48

您无法同步执行此操作,因为它是异步系统。但你可以使用线程来解决这个问题。

基本思想是启动一个在通道上等待的线程,每当它退出等待时,它都会为返回消息队列中的任何返回消息调用 call_back 函数。
然后,您可以在 call_back 函数中按照您想要的方式处理该消息

def registerCallback(channel, call_back):
    """ 此方法设置一个线程,用于处理无法由交换器路由的消息的异步回调。
    ”“”
    定义等待():
        尝试:
            通道.wait()
        除了异常,e:
            print("发布通道等待时出现问题:%s" % str(e))

        而不是channel.returned_messages.empty():
            returnedMessage = Channel.returned_messages.get()
            processReturnedMessageThread = 线程(target=call_back, args=(returnedMessage))
            processReturnedMessageThread.start()

        等待()

    等待=线程(目标=等待) 
    等待.start()

You can't do this synchronously as it is an asynchronous system. But you can solve this problem using threads.

The basic idea is that you start a thread which does the wait on the channel, whenever it comes out of the wait it calls the call_back function for any returned message in the returned message queue.
You can then deal with that message however you want to in the call_back function

def registerCallback(channel, call_back):
    """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange.
    """
    def wait():
        try:
            channel.wait()
        except Exception, e:
            print("Problem waiting on publish channel: %s" % str(e))

        while not channel.returned_messages.empty():
            returnedMessage = channel.returned_messages.get()
            processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage))
            processReturnedMessageThread.start()

        wait()

    waiting = Thread(target=wait) 
    waiting.start()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文