AMQP动态创建订阅队列
我正在尝试使用 AMQP、Websockets 和 Ruby 构建一个简单的聊天应用程序。我知道这可能不是理解 AMQP 的最佳用例,但我想了解我哪里出错了。
以下是我的 amqp-server 代码,
require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'
class MessageParser
# message format => "room:harry_potter, nickname:siddharth, room:members"
def self.parse(message)
parsed_message = JSON.parse(message)
response = {}
if parsed_message['status'] == 'status'
response[:status] = 'STATUS'
response[:username] = parsed_message['username']
response[:roomname] = parsed_message['roomname']
elsif parsed_message['status'] == 'message'
response[:status] = 'MESSAGE'
response[:message] = parsed_message['message']
response[:roomname] = parsed_message['roomname'].split().join('_')
end
response
end
end
class MongoManager
def self.establish_connection(database)
@db ||= Mongo::Connection.new('localhost', 27017).db(database)
@db.collection('rooms')
@db
end
end
@sockets = []
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
puts "Connected to AMQP broker. #{AMQP::VERSION} "
mongo = MongoManager.establish_connection("trackertalk_development")
EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
socket_detail = {:socket => ws}
ws.onopen do
@sockets << socket_detail
end
ws.onmessage do |message|
status = MessageParser.parse(message)
exchange = channel.fanout(status[:roomname].split().join('_'))
if status[:status] == 'STATUS'
queue = channel.queue(status[:username], :durable => true)
unless queue.subscribed?
puts "--------- SUBSCRIBED --------------"
queue.bind(exchange).subscribe do |payload|
puts "PAYLOAD : #{payload}"
ws.send(payload)
end
else
puts "----ALREADY SUBSCRIBED"
end
# only after 0.8.0rc14
#queue = channel.queue(status[:username], :durable => true)
#AMQP::Consumer.new(channel, queue)
elsif status[:status] == 'MESSAGE'
puts "********************* Message- published ******************************"
exchange.publish(status[:message)
end
end
ws.onclose do
@sockets.delete ws
end
end
end
我使用状态来指示传入消息是用于正在进行的聊天的消息还是需要我处理诸如订阅队列之类的杂务的状态消息。
我面临的问题是,当我发送如下消息时 socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))
调用了 exchange.publish' 但它仍然存在不会通过
ws.send` 推送到浏览器。
我对 EventMachine 和 AMQP 的理解有根本性的错误吗?
这是相同代码的粘贴 http://pastie.org/private/xosgb8tw1w5vuroa4w7a
我的代码似乎当我删除耐用 => 时,可以按预期工作true from queue = channel.queue(status[:username], :durable => true)
下面是我的 Rails 视图的片段,它标识了用户的用户名和房间名通过 Websockets 将其作为消息的一部分发送。
虽然当我删除 Durable => 时代码似乎可以工作true 我不明白为什么这会影响正在传递的消息。请忽略 mongo 部分,因为它还没有发挥任何作用。
我还想知道我的 AMQP 方法及其用法是否正确
<script>
$(document).ready(function(){
var username = '<%= @user.email %>';
var roomname = 'Bazingaa';
socket = new WebSocket('ws://127.0.0.1:8080/');
socket.onopen = function(msg){
console.log('connected');
socket.send(JSON.stringify({status:'status', username:username, roomname:roomname}));
}
socket.onmessage = function(msg){
$('#chat-log').append(msg.data);
}
});
</script>
<div class='block'>
<div class='content'>
<h2 class='title'><%= @room.name %></h2>
<div class='inner'>
<div id="chat-log">
</div>
<div id="chat-console">
<textarea rows="5" cols="40"></textarea>
</div>
</div>
</div>
</div>
<style>
#chat-log{
color:#000;
font-weight:bold;
margin-top:1em;
width:900px;
overflow:auto;
height:300px;
}
#chat-console{
bottom:10px;
}
textarea{
width:100%;
height:60px;
}
</style>
I am trying to build a simple chat application using AMQP, Websockets and Ruby. I understand that this may not be the best use-case to understand AMQP but I would like to understand where i am going wrong.
The following is my amqp-server code
require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'
class MessageParser
# message format => "room:harry_potter, nickname:siddharth, room:members"
def self.parse(message)
parsed_message = JSON.parse(message)
response = {}
if parsed_message['status'] == 'status'
response[:status] = 'STATUS'
response[:username] = parsed_message['username']
response[:roomname] = parsed_message['roomname']
elsif parsed_message['status'] == 'message'
response[:status] = 'MESSAGE'
response[:message] = parsed_message['message']
response[:roomname] = parsed_message['roomname'].split().join('_')
end
response
end
end
class MongoManager
def self.establish_connection(database)
@db ||= Mongo::Connection.new('localhost', 27017).db(database)
@db.collection('rooms')
@db
end
end
@sockets = []
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
puts "Connected to AMQP broker. #{AMQP::VERSION} "
mongo = MongoManager.establish_connection("trackertalk_development")
EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
socket_detail = {:socket => ws}
ws.onopen do
@sockets << socket_detail
end
ws.onmessage do |message|
status = MessageParser.parse(message)
exchange = channel.fanout(status[:roomname].split().join('_'))
if status[:status] == 'STATUS'
queue = channel.queue(status[:username], :durable => true)
unless queue.subscribed?
puts "--------- SUBSCRIBED --------------"
queue.bind(exchange).subscribe do |payload|
puts "PAYLOAD : #{payload}"
ws.send(payload)
end
else
puts "----ALREADY SUBSCRIBED"
end
# only after 0.8.0rc14
#queue = channel.queue(status[:username], :durable => true)
#AMQP::Consumer.new(channel, queue)
elsif status[:status] == 'MESSAGE'
puts "********************* Message- published ******************************"
exchange.publish(status[:message)
end
end
ws.onclose do
@sockets.delete ws
end
end
end
I use the status to indicate whether the incoming message is a message for ongoing chat or for a status message requiring me to handle chores like subscribing to the queue.
The problem i face is that when I send a message likesocket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))
The exchange.publish' is called but it still doesn't get pushed via the
ws.send` to the browser.
Is there something fundamentally wrong with my understanding of EventMachine and AMQP?
Here is the pastie for the same code http://pastie.org/private/xosgb8tw1w5vuroa4w7a
My code seems to work as desired when i remove the durable => true
from queue = channel.queue(status[:username], :durable => true)
The following is a snippet of my Rails view which identifies the user's username and the roomname and sends it as part of message via Websockets.
Though the code seems to work when i remove the durable => true
I fail to understand why that affects the message being delivered. Please Ignore the mongo part of as it does not play any part yet.
I would also like to know if my approach to AMQP and its usage is correct
<script>
$(document).ready(function(){
var username = '<%= @user.email %>';
var roomname = 'Bazingaa';
socket = new WebSocket('ws://127.0.0.1:8080/');
socket.onopen = function(msg){
console.log('connected');
socket.send(JSON.stringify({status:'status', username:username, roomname:roomname}));
}
socket.onmessage = function(msg){
$('#chat-log').append(msg.data);
}
});
</script>
<div class='block'>
<div class='content'>
<h2 class='title'><%= @room.name %></h2>
<div class='inner'>
<div id="chat-log">
</div>
<div id="chat-console">
<textarea rows="5" cols="40"></textarea>
</div>
</div>
</div>
</div>
<style>
#chat-log{
color:#000;
font-weight:bold;
margin-top:1em;
width:900px;
overflow:auto;
height:300px;
}
#chat-console{
bottom:10px;
}
textarea{
width:100%;
height:60px;
}
</style>
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我认为您的问题可能是队列在 ws.onmessage 调用之间挂在代理上。当客户端重新连接队列并且绑定已经存在时,因此 ws.send() 不会被调用。
默认情况下,当您创建队列时,它及其具有的任何绑定都会一直保留,直到代理重新启动,或者您明确告诉代理将其删除。
有两种方法可以更改此设置:
如果您可以控制代理,则您正在使用rabbitmq代理,这是一种内省代理上发生的事情的简单方法是安装管理插件,为代理上的交换、绑定和队列提供 Web 界面。
I think your problem might be the queue hangs around on the broker between invocations of ws.onmessage. When the client reconnects the queue and binding already exists so ws.send() doesn't get called.
By default when you create a queue, it and any bindings it has, hangs around until the broker restarts, or you explicitly tell the broker to delete it.
There are two ways to change this:
If you have control over the broker you are using the rabbitmq broker, an easy way to introspect what is happening on the broker is to install the management plugin, which provides a web interface to exchanges, bindings and queues on the broker.
乍一看,AMQP 位似乎没问题,但我不想设置所有依赖项。如果您提供仅包含 AMQP 部分的最小示例,我会检查它。
On the first look the AMQP bits seem to be OK, but I don't want to set up all the dependencies. If you provide a minimal example with just the AMQP part, I will check it.