如何让这个AMQP单消息订阅者稳定?

发布于 2024-12-08 09:53:40 字数 3014 浏览 0 评论 0原文

作为大型应用程序的一部分,我必须设置一些跨多个工作人员的传出请求的基本速率限制。这背后的想法相当简单:通过发布带有“立即”标志的“令牌”消息,如果没有人在等待,则该消息将被自动丢弃。通过让工作人员仅在发送传出请求之前订阅令牌队列,令牌不会“保存”,并且每个令牌只能使用一次。我认为这相当优雅。

不幸的是,添加和删除订阅者并不完全稳定。我已经在 https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733。代码如下:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME   = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer
  exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "") 

  n = 0
  EM::PeriodicTimer.new(PRODUCE_RATE) do
    message = "msg #{n}"
    exchange.publish(message,
                     :immediate   => true, # IMPORTANT, messages are dropped if nobody listening now
                     :routing_key => QUEUE_NAME)
    puts "> PUT #{message}"
    n += 1
  end
end

def start_consumer

  EM::PeriodicTimer.new(CONSUME_RATE) do

    started = Time.now
    AMQP::Channel.new do |channel_consumer|
      channel_consumer.prefetch(1)
      tick_queue = channel_consumer.queue(QUEUE_NAME)

      consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
      consumer.on_delivery do |_, message|

        took = Time.now - started
        puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

        consumer.cancel
        channel_consumer.close
      end
      consumer.consume
    end
  end
end

EM.run do
  EM.set_quantum(50)

  start_producer
  start_consumer
end

运行该示例几分钟最终会因以下两个错误之一而终止:

  1. amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `阻止': 未定义方法“handle_delivery” nil:NilClass (NoMethodError)

  2. amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame':尝试通过关闭的连接发送帧。 帧是#(AMQ::Client::ConnectionClosedError)

第一个错误是由于订阅者已被删除,但消息仍然传递给它,而 amq-client 库从来没有预料到这种情况发生。第二个错误来自发布者,它突然关闭了连接。

要使其始终按预期工作,我缺少什么?

使用的版本:

  • OS X 10.7.1
  • ruby​​ 1.9.2p312(2011-08-11 修订版 32926)[x86_64-darwin11.1.0]
  • RabbitMQ 2.6。 1

Gemfile:

source 'http://rubygems.org'

gem 'amqp'

Gemfile.lock:

GEM
  remote: http://rubygems.org/
  specs:
    amq-client (0.8.3)
      amq-protocol (>= 0.8.0)
      eventmachine
    amq-protocol (0.8.1)
    amqp (0.8.0)
      amq-client (~> 0.8.3)
      amq-protocol (~> 0.8.0)
      eventmachine
    eventmachine (0.12.10)

PLATFORMS
  ruby

DEPENDENCIES
  amqp
  eventmachine

As a part of a larger application, I've got to setup some basic rate-limiting of outgoing requests across multiple workers. The idea behind this is rather simple: by publishing a "token"-message with the "immediate" flag, this message is automatically discarded if nobody is waiting for it. By having workers only subscribing to the token-queue just before sending an outgoing request, tokens are not "saved up", and each token is available for use only once. I thought this rather elegant.

Unfortunately, adding and removing subscribers is not entirely stable. I've setup a full example over at https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733. The code is below:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME   = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer
  exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "") 

  n = 0
  EM::PeriodicTimer.new(PRODUCE_RATE) do
    message = "msg #{n}"
    exchange.publish(message,
                     :immediate   => true, # IMPORTANT, messages are dropped if nobody listening now
                     :routing_key => QUEUE_NAME)
    puts "> PUT #{message}"
    n += 1
  end
end

def start_consumer

  EM::PeriodicTimer.new(CONSUME_RATE) do

    started = Time.now
    AMQP::Channel.new do |channel_consumer|
      channel_consumer.prefetch(1)
      tick_queue = channel_consumer.queue(QUEUE_NAME)

      consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
      consumer.on_delivery do |_, message|

        took = Time.now - started
        puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

        consumer.cancel
        channel_consumer.close
      end
      consumer.consume
    end
  end
end

EM.run do
  EM.set_quantum(50)

  start_producer
  start_consumer
end

Running that example for a few minutes ends up dying with one of two errors:

  1. amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in
    <class:Consumer>': undefined method `handle_delivery' for
    nil:NilClass (NoMethodError)

  2. amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in
    `send_frame': Trying to send frame through a closed connection.
    Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0
    @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02",
    @channel=1> (AMQ::Client::ConnectionClosedError)

The first error is due to the subscriber having been removed, but a message is still delivered to it, and the amq-client library never expects this to happen. The second error is from the publisher, which all of a sudden has a closed connection.

What am I missing to make this consistently work as expected?

Versions used:

  • OS X 10.7.1
  • ruby 1.9.2p312 (2011-08-11 revision 32926) [x86_64-darwin11.1.0]
  • RabbitMQ 2.6.1

Gemfile:

source 'http://rubygems.org'

gem 'amqp'

Gemfile.lock:

GEM
  remote: http://rubygems.org/
  specs:
    amq-client (0.8.3)
      amq-protocol (>= 0.8.0)
      eventmachine
    amq-protocol (0.8.1)
    amqp (0.8.0)
      amq-client (~> 0.8.3)
      amq-protocol (~> 0.8.0)
      eventmachine
    eventmachine (0.12.10)

PLATFORMS
  ruby

DEPENDENCIES
  amqp
  eventmachine

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

萌吟 2024-12-15 09:53:40

来自#rabbitmq 通道(amqp 作者antares_):只需使用单个通道,就可以正常工作。略有变化,但稳定版本:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME   = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer channel
  exchange = AMQP::Exchange.new(channel, :direct, "") 

  n = 0
  EM::PeriodicTimer.new(PRODUCE_RATE) do
    message = "msg #{n}"
    exchange.publish(message,
                     :immediate   => true, # IMPORTANT, messages are dropped if nobody listening now
                     :routing_key => QUEUE_NAME)
    puts "> PUT #{message}"
    n += 1
  end
end

def start_consumer channel
  EM::PeriodicTimer.new(CONSUME_RATE) do

    started = Time.now
    tick_queue = channel.queue(QUEUE_NAME)

    consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true)
    consumer.on_delivery do |_, message|

      took = Time.now - started
      puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

      consumer.cancel do
        puts "< GET #{message} (CANCEL DONE)"
      end
    end
    consumer.consume
  end
end

EM.run do
  EM.set_quantum(50)

  AMQP::Channel.new do |channel|
    start_producer channel
  end

  AMQP::Channel.new do |channel|
    channel.prefetch(1)
    start_consumer channel
  end

end

From the #rabbitmq channel (amqp author antares_): just use a single channel, and it'll work fine. Slightly changed, but stable version:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME   = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer channel
  exchange = AMQP::Exchange.new(channel, :direct, "") 

  n = 0
  EM::PeriodicTimer.new(PRODUCE_RATE) do
    message = "msg #{n}"
    exchange.publish(message,
                     :immediate   => true, # IMPORTANT, messages are dropped if nobody listening now
                     :routing_key => QUEUE_NAME)
    puts "> PUT #{message}"
    n += 1
  end
end

def start_consumer channel
  EM::PeriodicTimer.new(CONSUME_RATE) do

    started = Time.now
    tick_queue = channel.queue(QUEUE_NAME)

    consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true)
    consumer.on_delivery do |_, message|

      took = Time.now - started
      puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

      consumer.cancel do
        puts "< GET #{message} (CANCEL DONE)"
      end
    end
    consumer.consume
  end
end

EM.run do
  EM.set_quantum(50)

  AMQP::Channel.new do |channel|
    start_producer channel
  end

  AMQP::Channel.new do |channel|
    channel.prefetch(1)
    start_consumer channel
  end

end
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文