Node.js + socket.io +当“re”连接时,node-amqp 和队列 binginds通过 socket.io 连接

发布于 2024-12-07 06:33:29 字数 1224 浏览 6 评论 0原文

我有一个非常接近此示例的场景:

一个主屏幕:

  • 此屏幕(客户端)将通过 server:9090/scope 连接到 socket.io 服务器(io.connect("http://server :9090/scope)) 并将向 socket.io 服务器发送一个事件“userBindOk” (socket.emit("userBindOk", message));

  • 服务器接收连接和“userBindOk”。此时,服务器应该获得与rabbitmq服务器的活动连接,并将队列绑定到刚刚通过socket.io连接到应用程序的相应用户。示例:

    socket.on("连接", 函数(客户端){ //客户端ID是1234 // 绑定rabbitmq交换器、队列,以及: queue.subscribe(//接收回调); })

  • 到目前为止,没问题 - 我可以通过 socket.io 发送/接收消息,没有任何问题。到目前为止

  • 但是,如果我刷新页面,所有这些步骤都将再次完成。结果,将发生与队列的绑定,但这一次与 socket.io 客户端的另一个会话相关。这意味着,如果我向与第一个 socket.io 会话(页面刷新之前)相关的队列发送一条消息,则该绑定应该(我认为)接收该消息并将其发送到无效的 socket.io 客户端(页面刷新 = socket.io 上下文中的新 client.id)。我可以证明这种行为,因为每次刷新页面时,我都需要发送 x 倍的消息。例如:我第一次连接: - 所以,1 条消息 - 一个屏幕更新;刷新页面:我需要向队列发送 2 条消息,并且只会从“实际”socket.io 客户端会话接收第二条消息 - 这种行为将在我刷新页面时发生(20 次页面刷新,20 条消息)发送到队列,服务器 socket.io“最后一个”客户端会将消息发送到客户端 socket.io 以呈现到屏幕上)。

我相信的解决方案是:

  • 找到一种方法在与socket.io服务器断开连接时“解除绑定”队列 - 我在node-amqp api上还没有看到这个选项(等待它:D)

  • 找到一种重新连接的方法socket.io 客户端使用相同的 client.id。这样我就可以识别即将到来的客户端,并应用一些逻辑来缓存套接字。

有什么想法吗?我试图说得非常清楚......但是,正如你所知,当试图澄清某些特定于某些上下文的内容时,暴露你的问题并不是那么容易......

tks

I have one scenario which is very close to this sample:

One main screen:

  • this screen (client side) will connect to the socket.io server thru server:9090/scope (io.connect("http://server:9090/scope)) and will send one event "userBindOk" (socket.emit("userBindOk", message)) to the socket.io server;

  • the server receives the connection and the "userBindOk". At this moment, the server should get the active connection to rabbitmq server and bind the queue to the respective user that just connected to the application thru socket.io. sample:

    socket.on("connection", function(client){
    //client id is 1234
    // bind rabbitmq exchange, queue, and:
    queue.subscribe(//receive callback);
    })

  • So far, no problem - I can send/receive messages thru socket.io without problems.

  • BUT, If I refresh the page, all those steps will be done again. As consequence, the binding to the queue will occur, but this time related to another session of the socket.io client. This means that if I send a message to the queue which is related to the first socket.io session (before the page refresh), that bind should (I think) receive the message and send it to a invalid socket.io client (page refresh = new client.id on the socket.io context). I can prove this behaviour because every time I refresh the page I need to send x times more messages. For instance: I`ve connected for the first time: - so, 1 message - one screen update; refresh the page: I need to send 2 messages to the queue and only the second message will be received from the "actual" socket.io client session - this behaviour will occur as many as I refresh the page (20 page refreshs, 20 messages to be sent to a queue and the server socket.io "last" client will send the message to the client socket.io to render into the screen).

The solutions I believe are:

  • Find a way to "unbind" the queue when disconnecting from the socket.io server - I didn`t see this option at the node-amqp api yet (waiting for it :D)

  • find a way to reconnect the socket.io client using the same client.id. This way I can identify the client that is coming and apply some logic to cache the socket.

Any ideas? I tried to be very clear... But, as you know, it`s not so eaey to expose your problem when trying to clarify something that is very specific to some context...

tks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

阳光的暖冬 2024-12-14 06:33:29

我这样解决了:

我曾经将rabbitMq队列声明为durable = true,autoDelete = false,exclusive = false,在我的应用程序中,有1个队列/用户和1个交换(type = direct),其routing_key name =queueName,我的应用程序还使用与 Android 应用程序或 iPhone 应用程序等浏览器不同的其他客户端的队列作为推送后备,因此我用来为每个用户创建 1 个队列。

这个问题的解决方案是更改我的rabbitMQ队列和交换声明。现在我将交换/用户声明为扇出和autoDelete = True,并且用户将拥有N个队列,其中durable = true,autoDelete = true,exclusive = true(No.queue = No.clients)并且所有队列都绑定到用户交换机(多播)。

注意:我的应用程序是用 django 编写的,我使用 node+socket+amqp 来使用 web.scokets 与浏览器进行通信,因此我使用 node-restler 来查询我的应用程序 api 以获取用户队列信息。

这就是rabbitMQ端,对于node+amqp+socket我这样做了:

服务器端:

  • onConnect:将用户交换声明为扇出,自动删除,持久。然后将队列声明为持久、自动删除和独占,然后将queue.bind绑定到用户交换,最后queue.subscribe和socket.disconnect将销毁队列,因此当客户端连接应用程序时将存在队列这解决了刷新问题,并允许用户在应用程序中拥有超过 1 个窗口选项卡:

服务器端:

            /*
             * unCaught exception handler
             */

            process.on('uncaughtException', function (err) {
                sys.p('Caught exception: ' + err);
                global.connection.end();
            });


            /*
             * Requiere libraries
             */

            global.sys =  require('sys');
            global.amqp = require('amqp');
            var rest = require('restler');
            var io = require('socket.io').listen(8080);

            /*
             * Module global variables
             */
            global.amqpReady = 0;


            /*
             * RabbitMQ connection
             */

            global.connection = global.amqp.createConnection({
                             host: host,
                             login: adminuser,
                             password: adminpassword,
                             vhost: vhost
                            });

            global.connection.addListener('ready', 
                        function () {
                            sys.p("RabbitMQ connection stablished");
                            global.amqpReady = 1;
                        }
            );


            /*
             * Web-Socket declaration
             */ 

            io.sockets.on('connection', function (socket) {
                socket.on('message', function (data) {
                    sys.p(data);
                    try{
                        var message = JSON.parse(data);                 
                    }catch(error){
                        socket.emit("message", JSON.stringify({"error": "invalid_params", "code": 400}));
                        var message = {};
                    }           
                    var message = JSON.parse(data);
                    if(message.token != undefined) {

                      rest.get("http://dev.kinkajougames.com/api/push",
                                {headers: 
                                    {
                                        "x-geochat-auth-token": message.token 
                                    }
                                }).on('complete', 
                                    function(data) {
                                        a = data;
                                }).on('success',
                                    function (data){
                                        sys.p(data);
                                        try{                                
                                            sys.p("---- creating exchange");
                                            socket.exchange = global.connection.exchange(data.data.bind, {type: 'fanout', durable: true, autoDelete: true});
                                            sys.p("---- declarando queue");
                                            socket.q = global.connection.queue(data.data.queue, {durable: true, autoDelete: true, exclusive: false},
                                                function (){
                                                    sys.p("---- bind queue to exchange");
                                                    //socket.q.bind(socket.exchange, "*");
                                                    socket.q.bind(socket.exchange, "*");
                                                    sys.p("---- subscribing queue exchange");
                                                    socket.q.subscribe(function (message) {
                                                        socket.emit("message", message.data.toString());
                                                    });     
                                                }
                                            );
                                        }catch(err){
                                            sys.p("Imposible to connection to rabbitMQ-server");
                                        }                                   

                                }).on('error', function (data){
                                    a = {
                                        data: data,
                                    };
                                }).on('400', function() {
                                    socket.emit("message", JSON.stringify({"error": "connection_error", "code": 400}));
                                }).on('401', function() {
                                    socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                                });               
                    }
                    else {
                      socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                    }

                });
                socket.on('disconnect', function () {
                    socket.q.destroy(); 
                    sys.p("closing socket");
                });
            });

客户端:

  • 带有选项“强制新连接”=true 和“卸载时同步断开连接”的套接字实例=假。
  • 客户端使用onbeforeunload和onunload windows对象事件发送socket.disconnect
  • 客户端在socket.connect事件上将用户令牌发送到节点。
  • 处理来自套接字的消息

     var 套接字;
            函数 webSocket(){
                //var 套接字 = new io.Socket();
                socket = io.connect("ws.dev.kinkajougames.com", {'强制新连接':true, '卸载时同步断开连接': false});
                //socket.connect();
    
                onSocketConnect = 函数(){
                    警报('已连接');
                    socket.send(JSON.stringify({
                        令牌:Get_Cookie('liveScoopToken')
                    }));
                };
    
                socket.on('connect', onSocketConnect);
                socket.on('消息', 函数(数据){
                    消息= JSON.parse(数据);
                    if (message.action == "聊天") {
                        if (idList[message.data.sender] != undefined) {
                            chatboxManager.dispatch(message.data.sender, {
                                名字:消息.数据.发件人
                            }, 消息.数据.消息);
                        }
                        别的 {
                            var 用户名 = message.data.sender;
                            Data.Collections.Chats.add({
                                id:用户名,
                                标题:用户名,
                                用户:用户名,
                                描述:“聊天”,
                                名字:用户名,
                                姓: ””
                            });
                            idList[message.data.sender] = message.data.sender;
                            chatboxManager.addBox(message.data.sender, {
                                标题:用户名,
                                用户:用户名,
                                描述:“聊天”,
                                名字:用户名,
                                姓: ””,
                                boxClosed:函数(id){
                                    警报(“关闭”);
                                }
                            });
                            chatboxManager.dispatch(message.data.sender, {
                                名字:消息.数据.发件人
                            }, 消息.数据.消息);
                        }
                    }
                });
            }                           
    
            webSocket();
    
            window.onbeforeunload = function() {
                return "您进行了未保存的更改。您是否仍要离开此页面?";
            }
    
            window.onunload = 函数(){
                套接字.disconnect();
            }
    

就这样,因此不再需要消息的循环。

I solved it like this:

I used to declare the rabbitMq queue as durable=true,autoDelete=false,exclusive=false and in my app there was 1 queue/user and 1 exchange(type=direct) with the routing_key name=queueName, my app also used the queue for other client diffent to browsers like android app or iphone app as push fallback, so i use to crear 1 queue for earch user.

The solution to this problem was to change my rabbitMQ queue and exchange declaration. Now i declare the exchange/user as fanout and autoDelete=True, and the user is going to have N queues with durable=true, autoDelete=true, exclusive=true (No. queue = No. clients) and all the queues are bind to the user-exchange(multicast).

NOTE: my app is wirten in django, and i use node+socket+amqp to be able to comunicate with the browser using web.scokets, so i use node-restler to query my app api to get the user-queue info.

thats the rabbitMQ side, for the node+amqp+socket i did this:

server-side:

  • onConnect: the declaration of the user exchange as fanout, autoDelete, durable. then declaration of the queue as durable, autodelete and exclusive, then the queue.bind to the user-exchange and finaly the queue.subscribe and the socket.disconnect will destroy the queue so there are going to exist queue as client connected the app and this solve the problem of the refresh and allow the user to have more than 1 window-tab with the app:

Server-side:

            /*
             * unCaught exception handler
             */

            process.on('uncaughtException', function (err) {
                sys.p('Caught exception: ' + err);
                global.connection.end();
            });


            /*
             * Requiere libraries
             */

            global.sys =  require('sys');
            global.amqp = require('amqp');
            var rest = require('restler');
            var io = require('socket.io').listen(8080);

            /*
             * Module global variables
             */
            global.amqpReady = 0;


            /*
             * RabbitMQ connection
             */

            global.connection = global.amqp.createConnection({
                             host: host,
                             login: adminuser,
                             password: adminpassword,
                             vhost: vhost
                            });

            global.connection.addListener('ready', 
                        function () {
                            sys.p("RabbitMQ connection stablished");
                            global.amqpReady = 1;
                        }
            );


            /*
             * Web-Socket declaration
             */ 

            io.sockets.on('connection', function (socket) {
                socket.on('message', function (data) {
                    sys.p(data);
                    try{
                        var message = JSON.parse(data);                 
                    }catch(error){
                        socket.emit("message", JSON.stringify({"error": "invalid_params", "code": 400}));
                        var message = {};
                    }           
                    var message = JSON.parse(data);
                    if(message.token != undefined) {

                      rest.get("http://dev.kinkajougames.com/api/push",
                                {headers: 
                                    {
                                        "x-geochat-auth-token": message.token 
                                    }
                                }).on('complete', 
                                    function(data) {
                                        a = data;
                                }).on('success',
                                    function (data){
                                        sys.p(data);
                                        try{                                
                                            sys.p("---- creating exchange");
                                            socket.exchange = global.connection.exchange(data.data.bind, {type: 'fanout', durable: true, autoDelete: true});
                                            sys.p("---- declarando queue");
                                            socket.q = global.connection.queue(data.data.queue, {durable: true, autoDelete: true, exclusive: false},
                                                function (){
                                                    sys.p("---- bind queue to exchange");
                                                    //socket.q.bind(socket.exchange, "*");
                                                    socket.q.bind(socket.exchange, "*");
                                                    sys.p("---- subscribing queue exchange");
                                                    socket.q.subscribe(function (message) {
                                                        socket.emit("message", message.data.toString());
                                                    });     
                                                }
                                            );
                                        }catch(err){
                                            sys.p("Imposible to connection to rabbitMQ-server");
                                        }                                   

                                }).on('error', function (data){
                                    a = {
                                        data: data,
                                    };
                                }).on('400', function() {
                                    socket.emit("message", JSON.stringify({"error": "connection_error", "code": 400}));
                                }).on('401', function() {
                                    socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                                });               
                    }
                    else {
                      socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                    }

                });
                socket.on('disconnect', function () {
                    socket.q.destroy(); 
                    sys.p("closing socket");
                });
            });

client-side:

  • The socket intance with options 'force new connection'=true and 'sync disconnect on unload'= false.
  • The client side use the onbeforeunload and onunload windows object events to send socket.disconnect
  • The client on socket.connect event send the user token to node.
  • proces message from socket

            var socket;
            function webSocket(){
                //var socket = new io.Socket();
                socket = io.connect("ws.dev.kinkajougames.com", {'force new connection':true, 'sync disconnect on unload': false});
                //socket.connect();
    
                onSocketConnect = function(){
                    alert('Connected');
                    socket.send(JSON.stringify({
                        token: Get_Cookie('liveScoopToken')
                    }));
                };
    
                socket.on('connect', onSocketConnect);
                socket.on('message', function(data){
                    message = JSON.parse(data);
                    if (message.action == "chat") {
                        if (idList[message.data.sender] != undefined) {
                            chatboxManager.dispatch(message.data.sender, {
                                first_name: message.data.sender
                            }, message.data.message);
                        }
                        else {
                            var username = message.data.sender;
                            Data.Collections.Chats.add({
                                id: username,
                                title: username,
                                user: username,
                                desc: "Chat",
                                first_name: username,
                                last_name: ""
                            });
                            idList[message.data.sender] = message.data.sender;
                            chatboxManager.addBox(message.data.sender, {
                                title: username,
                                user: username,
                                desc: "Chat",
                                first_name: username,
                                last_name: "",
                                boxClosed: function(id){
                                    alert("closing");
                                }
                            });
                            chatboxManager.dispatch(message.data.sender, {
                                first_name: message.data.sender
                            }, message.data.message);
                        }
                    }
                });
            }                           
    
            webSocket();
    
            window.onbeforeunload = function() {
                return "You have made unsaved changes. Would you still like to leave this page?";
            }
    
            window.onunload = function (){
                socket.disconnect();
            }
    

And that's it, so no more round-robing of the message.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文