订阅队列,接收 1 条消息,然后取消订阅
我有一个场景,我需要非常快速地分发和处理作业。我将快速将大约 45 个作业填充到队列中,并且可以同时处理大约 20 个作业(5 台机器,每台 4 个核心)。每个作业花费的时间各不相同,并且使问题变得复杂的是垃圾收集是一个问题,因此我需要能够使消费者离线以进行垃圾收集。
目前,我的一切都与 pop 相关(每个消费者每 5 毫秒弹出一次)。这似乎是不可取的,因为它意味着每秒向rabbitmq 发送 600 个 pop 请求。
如果有一个弹出命令可以像订阅一样操作,但只适用于一条消息,我会很高兴。 (该过程会阻塞,等待来自rabbitMQ连接的输入,通过类似于Kernel.select的东西)
我试图欺骗AMQP gem做这样的事情,但它不起作用:我似乎无法取消订阅,直到队列为空,没有更多消息发送给消费者。我担心其他取消订阅的方法会丢失消息。
Consumer_1.rb :
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
Consumer_many.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
end
end
Producer.rb:
require "amqp"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = "Message #{i}"
i+=1
puts "~ publishing #{msg}"
end
end
我将启动 Consumer_many.rb 和 Producer.rb。消息将按预期流动。
当我启动 Consumer_1.rb 时,它会获取所有其他消息(如预期)。但它永远不会取消订阅,因为它永远不会完成所有消息的处理……如此下去。
如何让 Consumer_1.rb 订阅队列,获取一条消息,然后将其自身从负载均衡器环中取出,以便它可以完成其工作,而不会丢失队列中可能存在的任何其他待处理作业否则要发送到预定的进程吗?
蒂姆
I have a scenario where I need to distribute and process jobs extremely quickly. I'll have about 45 jobs populated in the queue quickly and I can process about 20 simultaneously (5 machines, 4 cores each). Each job takes a variable amount of time, and to complicate matters garbage collection is an issue so I need to be able to take a consumer offline for garbage collection.
Currently, I have everything working with pop (every consumer pops every 5ms). This seems undesirable because it translates to 600 pop requests per second to rabbitmq.
I would love if it there were a pop command that would act like subscribe, but for only one message. (the process would block, waiting for input from the rabbitMQ connection, via something akin to Kernel.select)
I have tried to trick the AMQP gem to doing something like this, but it's not working: I can't seem to unsubscribe until the queue is empty and no more messages are being sent to the consumer. Other methods of unsubscribing I fear will lose messages.
consume_1.rb :
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
consumer_many.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
end
end
producer.rb:
require "amqp"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = "Message #{i}"
i+=1
puts "~ publishing #{msg}"
end
end
I'll launch consume_many.rb and producer.rb. Messages will flow as expected.
When I launch consume_1.rb, it gets every other message (as expected). But it NEVER unsubscribes because it never finishes processing all of its messages... so on it goes.
How do I get consume_1.rb to subscribe to the queue, get a single message, and then take itself out of the load-balancer ring so it can do it's work, without losing any additional pending jobs that might be in the queue and would otherwise be scheduled to be sent to the process?
Tim
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
这是 AMQP gem 的一个简单但记录很少的功能,您需要的是:
在您的消费者中:
然后使用您的订阅块,执行
以下操作: 它告诉 RabbitMQ 一次只向您的消费者发送 1 条消息,并且在处理长时间运行的任务一段时间后对队列头调用
ack
之前,不会发送另一条消息。我可能需要纠正这一点,但我相信
直接
交换更适合这项任务。This is a simple, yet very poorly documented feature of the AMQP gem, what you need is this:
In your consumer:
And then with your subscribe block, do:
What this does it tells RabbitMQ to only send your consumer 1 message at a time, and not send another message until you call
ack
on the queue header after working on a long running task for a while.I might need to be corrected on this, but I believe a
direct
exchange would be better suited to this task.在我所拥有的环境中,
RabbitMQ版本:3.3.3
amqp gem版本:1.5.0
Ivan的解决方案仍然导致从队列中获取所有消息。
相反,可以通过设置通道的 QoS 来限制订阅队列的未确认消息的数量。
根据 AMQP::Channel 的 API 文档,
该方法的一个注释是,如果您在 2.3.6 版本之后运行 RabbitMQ 服务器,则 prefetch_size 已弃用。
希望该解决方案可以帮助某人。
干杯。
With the environment that I have,
RabbitMQ version: 3.3.3
amqp gem version: 1.5.0
The solution from Ivan still resulted in all the messages being fetched from the queue.
Instead, the number of unacknowledged messages by subscribing to a queue can be limited by setting the QoS of a channel.
As per the API document of AMQP::Channel,
One note for the method in that, if you are running RabbitMQ servers after version 2.3.6, prefetch_size is deprecated.
Hope the solution helps someone out.
Cheers.