EventMachine EM::Iterator 被rabbitmq RPC 阻塞

发布于 2024-12-25 05:28:22 字数 2630 浏览 1 评论 0原文

我正在尝试设置 RabbitMQ rpc。我想要一个队列来侦听,当它收到一条消息时,我希望它回复通过带有多条消息的reply_to标头指定的匿名队列。

我有以下 thor 任务创建一个队列,然后使用 EM:Iterator 将许多消息发送回使用replyt_to路由键指定的队列:

desc "start_consumer", "start the test consumer"
def start_consumer
  conf = {
    :host => "localhost", 
    :user => "guest", 
    :password => "guest", 
    :vhost => "/", 
    :logging => true, 
    :port => 5672
  }

  # n = 1

  AMQP.start(conf) do |connection|

    channel = AMQP::Channel.new(connection)

    requests_queue = channel.queue("one")
    requests_queue.purge

    Signal.trap("INT") do
      connection.close do
        EM.stop{exit}
      end
    end

    channel.prefetch(1)

    requests_queue.subscribe(:ack => true) do |header, body|
      url_search = MultiJson.decode(body)

      EM::Iterator.new(0..5).each do |n, iter|
        lead = get_lead(n, (n == 5))

        puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}"

        AMQP::Exchange.default.publish(
                                        MultiJson.encode(lead), 
                                        :immediate => true,
                                        :routing_key => header.reply_to,
                                        :correlation_id => header.correlation_id
                                      )  

        iter.next
      end
    end

    puts " [x] Awaiting RPC requests"
  end          
end

下面的代码将一条消息发送到上面指定的队列,并创建一个队列,该队列将用于监听 EM::Iterator 代码发送的消息。该队列的名称是第一个队列的reply_to标头的路由键。

def publish(urlSearch, routing_key)
  EM.run do
    corr_id = rand(10_000_000).to_s

    requests ||= Hash.new

    connection = AMQP.connect(:host => "localhost")

    callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false)

    callback_queue.subscribe do |header, body|  
      lead = safe_json_decode(body)

      puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"              

      if lead["is_last"]
        puts "in exit"
        connection.close do
          EM.stop{exit}
        end
      end
    end

    callback_queue.append_callback(:declare) do
      AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
    end

    puts "initial message sent"
  end
end

上面的代码按照我想要的方式工作,但有一个令人讨厌的异常。有些东西阻止了 EM::Iterator 代码的异步执行。只有在 EM::Iterator 代码完成后才会发送消息。我希望消息异步发送,并在每次迭代后由匿名队列处理。目前,只有在 EM::Iterator 代码完成最后一次迭代之后,才会发送所有消息。

任何人都可以看到我做错了什么或建议不同的方法吗?我尝试了 EM::defer 并有相同的行为。

I am trying to set up RabbitMQ rpc. I want one queue to listen, and when it receives a message I want it to reply to an anonymous queue that is specified via the reply_to header with multiple messages.

I have the following thor task creates a queue and then uses EM:Iterator to send a number of messages back to the queue specified with the replyt_to routing key:

desc "start_consumer", "start the test consumer"
def start_consumer
  conf = {
    :host => "localhost", 
    :user => "guest", 
    :password => "guest", 
    :vhost => "/", 
    :logging => true, 
    :port => 5672
  }

  # n = 1

  AMQP.start(conf) do |connection|

    channel = AMQP::Channel.new(connection)

    requests_queue = channel.queue("one")
    requests_queue.purge

    Signal.trap("INT") do
      connection.close do
        EM.stop{exit}
      end
    end

    channel.prefetch(1)

    requests_queue.subscribe(:ack => true) do |header, body|
      url_search = MultiJson.decode(body)

      EM::Iterator.new(0..5).each do |n, iter|
        lead = get_lead(n, (n == 5))

        puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}"

        AMQP::Exchange.default.publish(
                                        MultiJson.encode(lead), 
                                        :immediate => true,
                                        :routing_key => header.reply_to,
                                        :correlation_id => header.correlation_id
                                      )  

        iter.next
      end
    end

    puts " [x] Awaiting RPC requests"
  end          
end

The code beloow sends a message to the queue specified above and also creates a queue that will be used to listen for the messages sent by the EM::Iterator code. This queue's name is the routing key for the first queues reply_to header.

def publish(urlSearch, routing_key)
  EM.run do
    corr_id = rand(10_000_000).to_s

    requests ||= Hash.new

    connection = AMQP.connect(:host => "localhost")

    callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false)

    callback_queue.subscribe do |header, body|  
      lead = safe_json_decode(body)

      puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"              

      if lead["is_last"]
        puts "in exit"
        connection.close do
          EM.stop{exit}
        end
      end
    end

    callback_queue.append_callback(:declare) do
      AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
    end

    puts "initial message sent"
  end
end

The above code works as I want with one annoying exception. Something is blocking the EM::Iterator code from being executed asynchronously. It is only after the EM::Iterator code has completed that the messages are sent. I want the messages to be sent asynchronously and handled by the anonymous queue after each iteration. At the moment, it is only after the EM::Iterator code has completed its last iteration that all the messages are sent.

Can anyone see what I am doing wrong or suggest a different approach? I tried EM::defer and had the same behaviour.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

花开雨落又逢春i 2025-01-01 05:28:22

启动一个新线程是我问题的答案:

Thread.new do
  5.times do
    lead = get_lead(n, (n == 5))

    puts "message #{n} is_last = #{lead.is_last} at #{Time.now}";

    AMQP::Exchange.default.publish(
                                    MultiJson.encode(lead), 
                                    :routing_key => header.reply_to,
                                    :correlation_id => header.correlation_id
                                  )

    n += 1
    sleep(2)
  end
end

创建一个新线程会阻止 EventMachine 反应器被阻塞,并且消息会异步发送。

Spinning up a new thread was the answer to my problem:

Thread.new do
  5.times do
    lead = get_lead(n, (n == 5))

    puts "message #{n} is_last = #{lead.is_last} at #{Time.now}";

    AMQP::Exchange.default.publish(
                                    MultiJson.encode(lead), 
                                    :routing_key => header.reply_to,
                                    :correlation_id => header.correlation_id
                                  )

    n += 1
    sleep(2)
  end
end

Creating a new thread stops the EventMachine reactor being blocked and the messages are sent async.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文