AMQP动态创建订阅队列

发布于 2024-11-25 12:43:13 字数 4438 浏览 2 评论 0原文

我正在尝试使用 AMQP、Websockets 和 Ruby 构建一个简单的聊天应用程序。我知道这可能不是理解 AMQP 的最佳用例,但我想了解我哪里出错了。

以下是我的 amqp-server 代码,

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'

class MessageParser
  # message format => "room:harry_potter, nickname:siddharth, room:members"
  def self.parse(message)
    parsed_message = JSON.parse(message)

    response = {}
    if parsed_message['status'] == 'status'
      response[:status] = 'STATUS'
      response[:username] = parsed_message['username']
      response[:roomname] = parsed_message['roomname']
    elsif parsed_message['status'] == 'message'
      response[:status]   = 'MESSAGE'
      response[:message]  = parsed_message['message']
      response[:roomname] = parsed_message['roomname'].split().join('_')
    end

    response
  end
end

class MongoManager
  def self.establish_connection(database)
    @db ||= Mongo::Connection.new('localhost', 27017).db(database)
    @db.collection('rooms')

    @db
  end  
end


@sockets = []
EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel = AMQP::Channel.new(connection)

  puts "Connected to AMQP broker. #{AMQP::VERSION} "

  mongo = MongoManager.establish_connection("trackertalk_development")

  EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
    socket_detail = {:socket => ws}
    ws.onopen do 
      @sockets << socket_detail

    end

    ws.onmessage do |message|

      status  = MessageParser.parse(message)         
      exchange = channel.fanout(status[:roomname].split().join('_'))   

      if status[:status] == 'STATUS'               
         queue = channel.queue(status[:username], :durable => true)

        unless queue.subscribed? 
         puts "--------- SUBSCRIBED --------------"
         queue.bind(exchange).subscribe do |payload|
            puts "PAYLOAD :  #{payload}"
            ws.send(payload)
          end 
        else
          puts "----ALREADY SUBSCRIBED"
        end                  

        # only after 0.8.0rc14
        #queue = channel.queue(status[:username], :durable => true)      
        #AMQP::Consumer.new(channel, queue)        

      elsif status[:status] == 'MESSAGE'
        puts "********************* Message- published ******************************"
        exchange.publish(status[:message)  
      end                  
    end

    ws.onclose do 
      @sockets.delete ws
    end
  end    
end

我使用状态来指示传入消息是用于正在进行的聊天的消息还是需要我处理诸如订阅队列之类的杂务的状态消息。

我面临的问题是,当我发送如下消息时 socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

调用了 exchange.publish' 但它仍然存在不会通过ws.send` 推送到浏览器。

我对 EventMachine 和 AMQP 的理解有根本性的错误吗?

这是相同代码的粘贴 http://pastie.org/private/xosgb8tw1w5vuroa4w7a

我的代码似乎当我删除耐用 => 时,可以按预期工作true from queue = channel.queue(status[:username], :durable => true)

下面是我的 Rails 视图的片段,它标识了用户的用户名和房间名通过 Websockets 将其作为消息的一部分发送。

虽然当我删除 Durable => 时代码似乎可以工作true 我不明白为什么这会影响正在传递的消息。请忽略 mongo 部分,因为它还没有发挥任何作用。

我还想知道我的 AMQP 方法及其用法是否正确

<script>
    $(document).ready(function(){
        var username = '<%= @user.email %>';
        var roomname = 'Bazingaa';

        socket = new WebSocket('ws://127.0.0.1:8080/');

        socket.onopen = function(msg){
            console.log('connected');
            socket.send(JSON.stringify({status:'status', username:username, roomname:roomname}));
        }

        socket.onmessage = function(msg){
            $('#chat-log').append(msg.data);

        }

    });

</script>
<div class='block'>
  <div class='content'>
    <h2 class='title'><%= @room.name %></h2>
    <div class='inner'>
      <div id="chat-log">
      </div>

      <div id="chat-console">
        <textarea rows="5" cols="40"></textarea>
      </div>
    </div>
  </div>
</div>

<style>
    #chat-log{
        color:#000;
        font-weight:bold;
        margin-top:1em;
        width:900px;
        overflow:auto;
        height:300px;
    }
    #chat-console{
        bottom:10px;
    }

    textarea{
        width:100%;
        height:60px;
    }
</style>

I am trying to build a simple chat application using AMQP, Websockets and Ruby. I understand that this may not be the best use-case to understand AMQP but I would like to understand where i am going wrong.

The following is my amqp-server code

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'

class MessageParser
  # message format => "room:harry_potter, nickname:siddharth, room:members"
  def self.parse(message)
    parsed_message = JSON.parse(message)

    response = {}
    if parsed_message['status'] == 'status'
      response[:status] = 'STATUS'
      response[:username] = parsed_message['username']
      response[:roomname] = parsed_message['roomname']
    elsif parsed_message['status'] == 'message'
      response[:status]   = 'MESSAGE'
      response[:message]  = parsed_message['message']
      response[:roomname] = parsed_message['roomname'].split().join('_')
    end

    response
  end
end

class MongoManager
  def self.establish_connection(database)
    @db ||= Mongo::Connection.new('localhost', 27017).db(database)
    @db.collection('rooms')

    @db
  end  
end


@sockets = []
EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel = AMQP::Channel.new(connection)

  puts "Connected to AMQP broker. #{AMQP::VERSION} "

  mongo = MongoManager.establish_connection("trackertalk_development")

  EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
    socket_detail = {:socket => ws}
    ws.onopen do 
      @sockets << socket_detail

    end

    ws.onmessage do |message|

      status  = MessageParser.parse(message)         
      exchange = channel.fanout(status[:roomname].split().join('_'))   

      if status[:status] == 'STATUS'               
         queue = channel.queue(status[:username], :durable => true)

        unless queue.subscribed? 
         puts "--------- SUBSCRIBED --------------"
         queue.bind(exchange).subscribe do |payload|
            puts "PAYLOAD :  #{payload}"
            ws.send(payload)
          end 
        else
          puts "----ALREADY SUBSCRIBED"
        end                  

        # only after 0.8.0rc14
        #queue = channel.queue(status[:username], :durable => true)      
        #AMQP::Consumer.new(channel, queue)        

      elsif status[:status] == 'MESSAGE'
        puts "********************* Message- published ******************************"
        exchange.publish(status[:message)  
      end                  
    end

    ws.onclose do 
      @sockets.delete ws
    end
  end    
end

I use the status to indicate whether the incoming message is a message for ongoing chat or for a status message requiring me to handle chores like subscribing to the queue.

The problem i face is that when I send a message like
socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

The exchange.publish' is called but it still doesn't get pushed via thews.send` to the browser.

Is there something fundamentally wrong with my understanding of EventMachine and AMQP?

Here is the pastie for the same code http://pastie.org/private/xosgb8tw1w5vuroa4w7a

My code seems to work as desired when i remove the durable => true from queue = channel.queue(status[:username], :durable => true)

The following is a snippet of my Rails view which identifies the user's username and the roomname and sends it as part of message via Websockets.

Though the code seems to work when i remove the durable => true I fail to understand why that affects the message being delivered. Please Ignore the mongo part of as it does not play any part yet.

I would also like to know if my approach to AMQP and its usage is correct

<script>
    $(document).ready(function(){
        var username = '<%= @user.email %>';
        var roomname = 'Bazingaa';

        socket = new WebSocket('ws://127.0.0.1:8080/');

        socket.onopen = function(msg){
            console.log('connected');
            socket.send(JSON.stringify({status:'status', username:username, roomname:roomname}));
        }

        socket.onmessage = function(msg){
            $('#chat-log').append(msg.data);

        }

    });

</script>
<div class='block'>
  <div class='content'>
    <h2 class='title'><%= @room.name %></h2>
    <div class='inner'>
      <div id="chat-log">
      </div>

      <div id="chat-console">
        <textarea rows="5" cols="40"></textarea>
      </div>
    </div>
  </div>
</div>

<style>
    #chat-log{
        color:#000;
        font-weight:bold;
        margin-top:1em;
        width:900px;
        overflow:auto;
        height:300px;
    }
    #chat-console{
        bottom:10px;
    }

    textarea{
        width:100%;
        height:60px;
    }
</style>

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

微暖i 2024-12-02 12:43:13

我认为您的问题可能是队列在 ws.onmessage 调用之间挂在代理上。当客户端重新连接队列并且绑定已经存在时,因此 ws.send() 不会被调用。

默认情况下,当您创建队列时,它及其具有的任何绑定都会一直保留,直到代理重新启动,或者您明确告诉代理将其删除。

有两种方法可以更改此设置:

  • 在创建队列时添加 durable 标志,这将导致即使代理重新启动,队列也会保留下来
  • 添加 auto_delete 标志,这将导致代理在没有消费者连接的短时间内自动删除实体

如果您可以控制代理,则您正在使用rabbitmq代理,这是一种内省代理上发生的事情的简单方法是安装管理插件,为代理上的交换、绑定和队列提供 Web 界面。

I think your problem might be the queue hangs around on the broker between invocations of ws.onmessage. When the client reconnects the queue and binding already exists so ws.send() doesn't get called.

By default when you create a queue, it and any bindings it has, hangs around until the broker restarts, or you explicitly tell the broker to delete it.

There are two ways to change this:

  • Adding the durable flag when you create the queue, which will cause the queue to stick around even if the broker restarts
  • Adding the auto_delete flag, which will cause the broker to automatically delete the entity after a short amount of time of not being having a consumer attached to it

If you have control over the broker you are using the rabbitmq broker, an easy way to introspect what is happening on the broker is to install the management plugin, which provides a web interface to exchanges, bindings and queues on the broker.

等风也等你 2024-12-02 12:43:13

乍一看,AMQP 位似乎没问题,但我不想设置所有依赖项。如果您提供仅包含 AMQP 部分的最小示例,我会检查它。

On the first look the AMQP bits seem to be OK, but I don't want to set up all the dependencies. If you provide a minimal example with just the AMQP part, I will check it.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文