关于mqtt多种实现方式的各种问题?

发布于 2022-09-04 16:04:53 字数 11660 浏览 18 评论 0

最近在学习做mqtt的web端小项目,然后用的各种方法都有问题:
1.第一种方法使用的Ajax长轮询,客户端发送以个ajax请求,如果有订阅的消息就马上发送回来,处理完信息后再次发起请求,若没有消息则一直hold在服务器端。但是因为MqttSubscribe使用callback模式,订阅的信息来到后会自己调用messageArrived()方法,不知道怎么将数据获取并返回给前台:

public class MQTTSubscribe implements MqttCallback{  
  
    public static final String HOST = "tcp:/localhost:61613";
        
    public static final String TOPIC = "MQTTtest";  
    private static final String clientid = "subscriber";  
    private MqttClient client;  
    private MqttConnectOptions options;  
    private String userName = "admin";  
    private String passWord = "password";  
    private ScheduledExecutorService scheduler;  
  
    
    public void startReconnect() {  
        scheduler = Executors.newSingleThreadScheduledExecutor();  
        scheduler.scheduleAtFixedRate(new Runnable() {  
            public void run() {  
                if (!client.isConnected()) {  
                    try {  
                        client.connect(options);  
                    } catch (MqttSecurityException e) {  
                        e.printStackTrace();  
                    } catch (MqttException e) {  
                        e.printStackTrace();  
                    }  
                }  
            }  
        }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);  
    }   
  
    private void start() {  
        try {    
            client = new MqttClient(HOST, clientid, new MemoryPersistence());  
            options = new MqttConnectOptions();  
            options.setCleanSession(false);  
            options.setUserName(userName);  
            options.setPassword(passWord.toCharArray());  
            options.setConnectionTimeout(10);   
            options.setKeepAliveInterval(20);  
            client.setCallback(new MQTTCallback()); 
            MqttTopic topic = client.getTopic(TOPIC);  
            options.setWill(topic, "close".getBytes(), 2, true);  
              
            client.connect(options);  
            int[] Qos  = {2};  
            String[] topic1 = {TOPIC};  
            client.subscribe(topic1, Qos);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    
    
    public static void main(String[] args) throws MqttException {     
        MQTTSubscribe client = new MQTTSubscribe();  
        client.start();  
    }

    @Override
    public void connectionLost(Throwable arg0) {
        System.out.println("Connection lost, reconnect please!");         
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken arg0) {        
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
          System.out.println("Message arrived on topic:"+topic);  
          System.out.println("Message arrived on QoS:"+message.getQos());  
          System.out.println("Message arrived on content:"+new String(message.getPayload()));      
        
    }     
}  

2.第二种方法使用websocket实现,在服务器端的onOpen()方法中开启订阅,但是这样子的话如果要调用sendMessage()方法发布消息就会使连接断掉:

//该注解用来指定一个URI,客户端可以通过这个URI来连接到WebSocket。类似Servlet的注解mapping。无需在web.xml中配置。
@ServerEndpoint("/chat/{deviceId}/{userId}")
public class MyWebSocket {
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
    public static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    public MQTTSubscribe client = new MQTTSubscribe();

    /**
     * 连接建立成功调用的方法
     * @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(@PathParam("deviceId") int deviceId, @PathParam("userId") int userId, Session session){
        this.session = session;
        System.out.println(deviceId +","+ userId);
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1
        System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
        client.start("111", "websocket");
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(){
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("来自客户端的消息:" + message);
    }
    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error){
        System.out.println("发生错误");
        error.printStackTrace();
    }

    /**
     * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
            this.session.getBasicRemote().sendText(message);
        //this.session.getAsyncRemote().sendText(message);
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        MyWebSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        MyWebSocket.onlineCount--;
    }
}

3.第三种方法采用官方的mqttws31.js库,这个参数我感觉配置得没错,但是不知道为什么只有在第一次连接时不会出错,客户端离线重连就取不出离线这段时间的消息,而且重连后发布的消息有时也取不出来,消息全部在broker上,没有推送,也没有清除:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>Device</title>
    <!--[if lt IE 9]>
    <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
    <![endif]-->
    <link href="css/bootstrap.min.css" rel="stylesheet">
    <link href="css/bootstrap.min.responsive.css" rel="stylesheet">
    <style type="text/css">
        body { padding-top: 40px; }
    </style>
</head>

<body>

<div class="navbar navbar-fixed-top">
    <div class="navbar-inner">
        <div class="container">
            <a class="brand" href="#">Apollo MQTT WebSocket Chat Example</a>
        </div>
    </div>
</div>

<div class="container-fluid">
    <div class="row-fluid">
        <div class="span6">
            <div id="connect">
                <div class="page-header">
                    <h2>Server Login</h2>
                </div>
                <form class="form-horizontal" id='connect_form'>

                    <div class="form-actions">
                        <button id='connect_submit' type="submit" class="btn btn-large btn-primary">Connect</button>
                    </div>
                </form>
            </div>
            <div id="connected" style="display:none">
                <div class="page-header">
                    <h2>Chat Room</h2>
                </div>
                <div id="messages">
                </div>
                <form class="well form-search" id='send_form'>
                    <button class="btn" type="button" id='disconnect' style="float:right">Disconnect</button>
                    <input class="input-medium" id='send_form_input' placeholder="Type your message here" class="span6"/>
                    <button class="btn" type="submit">Send</button>
                </form>
            </div>
        </div>
        <div class="span4">
            <div class="page-header">
                <h2>Debug Log</h2>
            </div>
            <pre id="debug"></pre>
        </div>
    </div>
</div>

<!-- Scripts placed at the end of the document so the pages load faster -->
<script src='js/jquery.js'></script>
<script src="js/mqttws31.js"></script>
<script>//<![CDATA[
$(document).ready(function(){

    $("#connect_clientId").val("example-"+(Math.floor(Math.random() * 100000)));
    if( !window.WebSocket) {
        $("#connect").html("\
        <h1>Get a new Web Browser!</h1>\
        <p>\
        Your browser does not support WebSockets. This example will not work properly.<br>\
        Please use a Web Browser with WebSockets support (WebKit or Google Chrome).\
        </p>\
    ");
    } else {

        var client, destination;

        $('#connect_form').submit(function() {
            var host = '${info.host}';
            var port = '${info.port}';
            var clientId = '${info.clientId}';
            var user = '${info.username}';
            var password = '${info.password}';

            destination = '${info.topic}';


            client = new Messaging.Client(host, Number(port), clientId);

            client.onConnect = onConnect;

            client.onMessageArrived = onMessageArrived;
            client.onConnectionLost = onConnectionLost;

            client.connect({
                cleanSession:false,
                userName:user,
                password:password,
                onSuccess:onConnect,
                onFailure:onFailure
            });
            return false;
        });

        // the client is notified when it is connected to the server.
        var onConnect = function(frame) {
            debug("connected to MQTT");
            $('#connect').fadeOut({ duration: 'fast' });
            $('#connected').fadeIn();
            client.subscribe(destination, 2);
        };

        // this allows to display debug logs directly on the web page
        var debug = function(str) {
            $("#debug").append(document.createTextNode(str + "\n"));
        };

        $('#disconnect').click(function() {
            client.disconnect();
            $('#connected').fadeOut({ duration: 'fast' });
            $('#connect').fadeIn();
            $("#messages").html("")
            return false;
        });

        $('#send_form').submit(function() {
            var text = $('#send_form_input').val();
            if (text) {
                message = new Messaging.Message(text);
                message.destinationName = '${info.topic1}';
                message.qos = 2;
                message.retained = true;
                client.send(message);
                $.ajax({
                    url: "pub.do",
                    type: "POST",
                    processDate: true,
                    data:{"topic":'${info.topic1}',"msg": text},
                    dataType: "json",
                    success: function () {
                        return;
                    }
                });
                $('#send_form_input').val("");
            }
            return false;
        });

        function onFailure(failure) {
            debug("failure")
            debug(failure.errorMessage);
        }

        function onMessageArrived(message) {
            var p = document.createElement("p");
            var t = document.createTextNode(message.payloadString);
            p.appendChild(t);
            $("#messages").append(p);
        }

        function onConnectionLost(responseObject) {
            if (responseObject.errorCode !== 0) {
                debug(client.clientId + ": " + responseObject.errorCode + "\n");
            }
        }


    }
});
//]]></script>

</body>
</html>

大神们求原因和解决方法。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文