将大量消息发布到AMQP队列
使用 Ruby amqp 库的 v0.7.1 和 Ruby 1.8.7,我尝试将大量(数百万)短(~40 字节)消息发布到 RabbitMQ 服务器。我的程序的主循环(好吧,不是真正的循环,但仍然)看起来像这样:
AMQP.start(:host => '1.2.3.4',
:username => 'foo',
:password => 'bar') do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("foobar", {:durable => true})
i = 0
EM.add_periodic_timer(1) do
print "\rPublished #{i} commits"
end
results = get_results # <- Returns an array
processor = proc do
if x = results.shift then
exchange.publish(x, :persistent => true,
:routing_key => "test.#{i}")
i += 1
EM.next_tick processor
end
end
EM.next_tick(processor)
AMQP.stop {EM.stop} end
代码开始处理结果数组很好,但过了一会儿(通常是在 12k 消息之后左右)它就会终止,并出现以下错误
/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send':
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError)
No messages都存储在队列中。该错误似乎是在从程序到队列服务器的网络活动开始时发生的。
我做错了什么?
Using v0.7.1 of the Ruby amqp library and Ruby 1.8.7, I am trying to post a large number (millions) of short (~40 bytes) messages to a RabbitMQ server. My program's main loop (well, not really a loop, but still) looks like this:
AMQP.start(:host => '1.2.3.4',
:username => 'foo',
:password => 'bar') do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("foobar", {:durable => true})
i = 0
EM.add_periodic_timer(1) do
print "\rPublished #{i} commits"
end
results = get_results # <- Returns an array
processor = proc do
if x = results.shift then
exchange.publish(x, :persistent => true,
:routing_key => "test.#{i}")
i += 1
EM.next_tick processor
end
end
EM.next_tick(processor)
AMQP.stop {EM.stop} end
The code starts processing the results array just fine, but after a while (usually, after 12k messages or so) it dies with the following error
/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send':
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError)
No messages are stored on the queue. The error seems to be happening just when network activity from the program to the queue server starts.
What am I doing wrong?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
第一个错误是您没有发布您正在使用的 RabbitMQ 版本。许多人正在运行旧的过时版本 1.7.2,因为这是他们的操作系统包存储库中的版本。对于像您这样发送大量消息的人来说,这是一个糟糕的举动。从 RabbitMQ 站点本身获取 RabbitMQ 2.5.1 并删除默认的系统包。
第二个错误是您没有告诉我们 RabbitMQ 日志中的内容。
第三个错误是您没有提及消耗消息的内容。是否有另一个进程在某处运行,该进程已声明队列并将其绑定到交换器。除非有人将消息队列声明给 RabbitMQ 并将其绑定到交换器,否则没有消息队列。即使这样,只有当队列的绑定键与您发布的路由键匹配时,消息才会流动。
第四个错误。您混淆了路由密钥和绑定密钥。路由键是一个字符串,例如 topic.test.json.echos,绑定键(用于将队列绑定到交换器)是一个模式,例如 topic.# 或 topic..json。
< em>在您澄清后更新
关于版本,我不确定它何时被修复,但 1.7.2 中存在一个问题,大量持久消息导致 RabbitMQ 在滚动其持久日志时崩溃,并且崩溃后无法重新启动,直到有人手动取消翻转。
当您说正在打开和关闭连接时,我希望它不是每条消息。这将是一种奇怪的使用 AMQP 的方式。
让我重复一遍。生产者不将消息写入队列。它们将消息写入交换器,然后交换器根据路由键(字符串)和队列的绑定键(模式)将消息路由到队列。在您的示例中,我误读了 # 符号的使用,但我没有看到任何声明队列并将其绑定到交换器的内容。
First mistake is that you didn't post the RabbitMQ version that you are using. Lots of people are running old obsolete version 1.7.2 because that is what is in their OS package repositories. Bad move for anyone sending the volume of messages that you are. Get RabbitMQ 2.5.1 from the RabbitMQ site itself and get rid of your default system package.
Second mistake is that you did not tell us what is in the RabbitMQ logs.
Third mistake is that you said nothing about what is consuming the messages. Is there another process running somewhere that has declared a queue and bound it to the exchange. There is NO message queue unless somebody declares it to RabbitMQ and binds it to an exchange. Even then messages will only flow if the binding key for the queue matches the routing key that you publish with.
Fourth mistake. You have routing keys and binding keys mixed up. The routing key is a string such as topic.test.json.echos and the binding key (used to bind a queue to an exchange) is a pattern like topic.# or topic..json.
Updated after your clarifications
Regarding versions, I'm not sure when it was fixed but there was a problem in 1.7.2 with large numbers of persistent messages causing RabbitMQ to crash when it rolled over its persistence log, and after crashing it was unable to restart until someone manually undid the rollover.
When you say that a connection is being opened and closed, I hope that it is not per message. That would be a strange way to use AMQP.
Let me repeat. Producers do NOT write messages to queues. They write messages to exchanges which then route the messages to queues based on the routing key (string) and the queue's binding key (pattern). In your example I misread the use of the # sign, but I see nothing which declares a queue and binds it to the exchange.