使用 AMQP、Rabbit 和 Ruby 刷新队列
我正在用 Ruby 开发一个系统,该系统将利用 RabbitMQ 在执行某些工作时将消息发送到队列。我正在使用:
- Ruby 1.9.1 Stable
- RabbitMQ 1.7.2
- AMQP gem v0.6.7 (http://github.com /tmm1/amqp)
我在这个 gem 上看到的大多数示例都在 EM.add_periodic_timer 块中进行发布调用。这不适用于我怀疑的绝大多数用例,当然也不适用于我的用例。我需要在完成一些工作时发布一条消息,因此将发布语句放在 add_periodic_timer 块中是不够的。
因此,我试图弄清楚如何将一些消息发布到队列,然后“刷新”它,以便我发布的任何消息都会传递给我的订阅者。
为了让您了解我的意思,请考虑以下发布者代码:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
MESSAGES = ["hello","goodbye","test"]
AMQP.start do
queue = MQ.queue('testq')
messages_published = 0
while (messages_published < 50)
if (rand() < 0.4)
message = MESSAGES[rand(MESSAGES.size)]
puts "#{Time.now.to_s}: Publishing: #{message}"
queue.publish(message)
messages_published += 1
end
sleep(0.1)
end
AMQP.stop do
EM.stop
end
end
因此,此代码只是循环,在循环的每次迭代中以 40% 的概率发布消息,然后休眠 0.1 秒。它会执行此操作,直到发布 50 条消息,然后停止 AMQP。当然,这只是一个概念证明。
现在,我的订阅者代码:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
AMQP.start do
queue = MQ.queue('testq')
queue.subscribe do |header, msg|
puts "#{Time.now.to_s}: Received #{msg}"
end
end
因此,我们只需订阅队列,对于收到的每条消息,我们都会将其打印出来。
很好,只不过订阅者仅在发布者调用 AMQP.stop 时收到全部 50 条消息。
这是我的出版商的输出。为了简洁起见,它在中间被截断:
$ ruby publisher.rb
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye
接下来是我的订阅者的输出:
$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
如果您注意到输出中的时间戳,订阅者只会在发布者停止 AMQP 并退出后收到所有消息。
那么,作为 AMQP 新手,我怎样才能让我的消息立即传递呢?我尝试将 AMQP.start 和 AMQP.stop 放入发布者的 while 循环体中,但随后仅传递第一条消息 - 但奇怪的是,如果我打开日志记录,服务器不会报告任何错误消息,并且消息确实被发送到队列,但订阅者永远不会收到。
如有建议,我们将不胜感激。感谢您的阅读。
I'm developing a system in Ruby that is going to make use of RabbitMQ to send messages to a queue as it does some work. I am using:
- Ruby 1.9.1 Stable
- RabbitMQ 1.7.2
- AMQP gem v0.6.7 (http://github.com/tmm1/amqp)
Most of the examples I have seen on this gem have their publish calls in an EM.add_periodic_timer block. This doesn't work for what I suspect is a vast majority of use cases, and certainly not for mine. I need to publish a message as I complete some work, so putting a publish statement in an add_periodic_timer block doesn't suffice.
So, I'm trying to figure out how to publish a few messages to a queue, and then "flush" it, so that any messages I've published are then delivered to my subscribers.
To give you an idea of what I mean, consider the following publisher code:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
MESSAGES = ["hello","goodbye","test"]
AMQP.start do
queue = MQ.queue('testq')
messages_published = 0
while (messages_published < 50)
if (rand() < 0.4)
message = MESSAGES[rand(MESSAGES.size)]
puts "#{Time.now.to_s}: Publishing: #{message}"
queue.publish(message)
messages_published += 1
end
sleep(0.1)
end
AMQP.stop do
EM.stop
end
end
So, this code simply loops, publishing a message with 40% probability on each iteration of the loop, and then sleeps for 0.1 seconds. It does this until 50 messages have been published, and then stops AMQP. Of course, this is just a proof of concept.
Now, my subscriber code:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
AMQP.start do
queue = MQ.queue('testq')
queue.subscribe do |header, msg|
puts "#{Time.now.to_s}: Received #{msg}"
end
end
So, we just subscribe to the queue, and for each message received, we print it out.
Great, except that the subscriber only receives all 50 messages when the publisher calls AMQP.stop.
Here's the output from my publisher. It has been truncated in the middle for brevity:
$ ruby publisher.rb
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye
Next, the output from my subscriber:
$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
If you note the timestamps in the output, the subscriber only receives all of the messages once the publisher has stopped AMQP and exited.
So, being an AMQP newb, how can I get my messages to deliver immediately? I tried putting AMQP.start and AMQP.stop in the body of the while loop of the publisher, but then only the first message gets delivered -- though strangely, if I turn on logging, there are no error messages reported by the server and the messages do get sent to the queue, but never get received by the subscriber.
Suggestions would be much appreciated. Thanks for reading.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
如果有人想要了解此问题的信息,请参阅 http://groups.google .com/group/ruby-amqp/browse_thread/thread/311965bcf9697ece
我通过在发布者代码中添加额外的线程解决了该问题:
For anyone wanting information on this issue, see http://groups.google.com/group/ruby-amqp/browse_thread/thread/311965bcf9697ece
I fixed the problem by adding an additional thread within my publisher code: