我正在使用 RabbitMQ 使用 PHP AMQP 扩展 < 为搜索索引器进程实现工作任务队列< /a>.我需要搜索索引器恶魔来侦听队列上的消息并在可用时使用它们。
我看到两种从队列中消费内容的方法:
- AMQPQueue::get - 不会阻塞,所以可能不是我想要的
- AMQPQueue::consume - 看起来很有希望
但是,使用 Consumer 似乎设置了一个不是当时的消费者已删除。这是 PHP:
$opts = array('min' => 1, 'max' => 10, 'ack' => false);
$messages = array();
while (count($messages) or $messages = $q->consume($opts)) {
$msg = array_pop($messages);
var_dump($msg);
// ...Do work here...
$q->ack($msg['delivery_tag']);
}
您可以看到使用rabbitmqctl 构建的消费者:
[andrew@localhost ~] rabbitmqctl list_queues name consumers
Listing queues ...
test_queue 3
[andrew@localhost ~] rabbitmqctl list_queues name consumers
Listing queues ...
test_queue 4
所以问题是,将 PHP 守护程序绑定到队列的正确方法是什么,以便它在等待消息可用时阻塞,然后开始阻塞/当它完成与每个消息批次相关的工作时再次监听?
I am using RabbitMQ to implement a worker task queue for a search indexer process using the PHP AMQP extension. I need the search indexer demon to listen for messages on the queue and consume them when it's available.
I see two methods for consuming content from a queue:
- AMQPQueue::get - doesn't block, so probably not what I'm after
- AMQPQueue::consume - seems promising
However, using consume appears to set up a consumer that is not then removed. Here's the PHP:
$opts = array('min' => 1, 'max' => 10, 'ack' => false);
$messages = array();
while (count($messages) or $messages = $q->consume($opts)) {
$msg = array_pop($messages);
var_dump($msg);
// ...Do work here...
$q->ack($msg['delivery_tag']);
}
And you can see the consumers building up using rabbitmqctl:
[andrew@localhost ~] rabbitmqctl list_queues name consumers
Listing queues ...
test_queue 3
[andrew@localhost ~] rabbitmqctl list_queues name consumers
Listing queues ...
test_queue 4
So the question is, what is the correct way to bind a PHP daemon to a queue such that it blocks while it waits for messages to be available, and starts blocking/listening again when it has completed the work associated with each message batch?
发布评论
评论(2)
我不确定 PHP Pecl 扩展如何实现消费者,但是我的 Amqp 库允许您通过调用函数来侦听传入消息(即消费),并且有几种可用的“退出策略”如果你不想永远阻止。 此处提供了文档,请查看“实现消费者”部分,以及 演示脚本在这里。
I'm not sure how the PHP Pecl extension implements consumers, but my Amqp library allows you to listen out for incoming messages (i.e. consume) by calling a function, and there are several "exit strategies" available in case you don't want to block forever. There's documentation available here, check the section "Implementing a Consumer", and a demo script here.
消费就是你想要的。它会阻塞直到收到消息。
自您的代码以来,API 已经发生了很大的变化,因此很难猜测出了什么问题。
http://www.php.net/manual/en/amqpqueue.consume.php
有半最新的文档和示例
consume is what you want. It'll block until it receives a message.
The API has changed a big since your code, so it's hard to guess what went wrong.
http://www.php.net/manual/en/amqpqueue.consume.php
has the semi latest documentation and example