关于mqtt多种实现方式的各种问题?
最近在学习做mqtt的web端小项目,然后用的各种方法都有问题:
1.第一种方法使用的Ajax长轮询,客户端发送以个ajax请求,如果有订阅的消息就马上发送回来,处理完信息后再次发起请求,若没有消息则一直hold在服务器端。但是因为MqttSubscribe使用callback模式,订阅的信息来到后会自己调用messageArrived()方法,不知道怎么将数据获取并返回给前台:
public class MQTTSubscribe implements MqttCallback{
public static final String HOST = "tcp:/localhost:61613";
public static final String TOPIC = "MQTTtest";
private static final String clientid = "subscriber";
private MqttClient client;
private MqttConnectOptions options;
private String userName = "admin";
private String passWord = "password";
private ScheduledExecutorService scheduler;
public void startReconnect() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
if (!client.isConnected()) {
try {
client.connect(options);
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
}
private void start() {
try {
client = new MqttClient(HOST, clientid, new MemoryPersistence());
options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
client.setCallback(new MQTTCallback());
MqttTopic topic = client.getTopic(TOPIC);
options.setWill(topic, "close".getBytes(), 2, true);
client.connect(options);
int[] Qos = {2};
String[] topic1 = {TOPIC};
client.subscribe(topic1, Qos);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws MqttException {
MQTTSubscribe client = new MQTTSubscribe();
client.start();
}
@Override
public void connectionLost(Throwable arg0) {
System.out.println("Connection lost, reconnect please!");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message arrived on topic:"+topic);
System.out.println("Message arrived on QoS:"+message.getQos());
System.out.println("Message arrived on content:"+new String(message.getPayload()));
}
}
2.第二种方法使用websocket实现,在服务器端的onOpen()方法中开启订阅,但是这样子的话如果要调用sendMessage()方法发布消息就会使连接断掉:
//该注解用来指定一个URI,客户端可以通过这个URI来连接到WebSocket。类似Servlet的注解mapping。无需在web.xml中配置。
@ServerEndpoint("/chat/{deviceId}/{userId}")
public class MyWebSocket {
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
public static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
public MQTTSubscribe client = new MQTTSubscribe();
/**
* 连接建立成功调用的方法
* @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
@OnOpen
public void onOpen(@PathParam("deviceId") int deviceId, @PathParam("userId") int userId, Session session){
this.session = session;
System.out.println(deviceId +","+ userId);
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
client.start("111", "websocket");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(){
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
}
/**
* 发生错误时调用
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error){
System.out.println("发生错误");
error.printStackTrace();
}
/**
* 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
* @param message
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
//this.session.getAsyncRemote().sendText(message);
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
MyWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
MyWebSocket.onlineCount--;
}
}
3.第三种方法采用官方的mqttws31.js库,这个参数我感觉配置得没错,但是不知道为什么只有在第一次连接时不会出错,客户端离线重连就取不出离线这段时间的消息,而且重连后发布的消息有时也取不出来,消息全部在broker上,没有推送,也没有清除:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Device</title>
<!--[if lt IE 9]>
<script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
<![endif]-->
<link href="css/bootstrap.min.css" rel="stylesheet">
<link href="css/bootstrap.min.responsive.css" rel="stylesheet">
<style type="text/css">
body { padding-top: 40px; }
</style>
</head>
<body>
<div class="navbar navbar-fixed-top">
<div class="navbar-inner">
<div class="container">
<a class="brand" href="#">Apollo MQTT WebSocket Chat Example</a>
</div>
</div>
</div>
<div class="container-fluid">
<div class="row-fluid">
<div class="span6">
<div id="connect">
<div class="page-header">
<h2>Server Login</h2>
</div>
<form class="form-horizontal" id='connect_form'>
<div class="form-actions">
<button id='connect_submit' type="submit" class="btn btn-large btn-primary">Connect</button>
</div>
</form>
</div>
<div id="connected" style="display:none">
<div class="page-header">
<h2>Chat Room</h2>
</div>
<div id="messages">
</div>
<form class="well form-search" id='send_form'>
<button class="btn" type="button" id='disconnect' style="float:right">Disconnect</button>
<input class="input-medium" id='send_form_input' placeholder="Type your message here" class="span6"/>
<button class="btn" type="submit">Send</button>
</form>
</div>
</div>
<div class="span4">
<div class="page-header">
<h2>Debug Log</h2>
</div>
<pre id="debug"></pre>
</div>
</div>
</div>
<!-- Scripts placed at the end of the document so the pages load faster -->
<script src='js/jquery.js'></script>
<script src="js/mqttws31.js"></script>
<script>//<![CDATA[
$(document).ready(function(){
$("#connect_clientId").val("example-"+(Math.floor(Math.random() * 100000)));
if( !window.WebSocket) {
$("#connect").html("\
<h1>Get a new Web Browser!</h1>\
<p>\
Your browser does not support WebSockets. This example will not work properly.<br>\
Please use a Web Browser with WebSockets support (WebKit or Google Chrome).\
</p>\
");
} else {
var client, destination;
$('#connect_form').submit(function() {
var host = '${info.host}';
var port = '${info.port}';
var clientId = '${info.clientId}';
var user = '${info.username}';
var password = '${info.password}';
destination = '${info.topic}';
client = new Messaging.Client(host, Number(port), clientId);
client.onConnect = onConnect;
client.onMessageArrived = onMessageArrived;
client.onConnectionLost = onConnectionLost;
client.connect({
cleanSession:false,
userName:user,
password:password,
onSuccess:onConnect,
onFailure:onFailure
});
return false;
});
// the client is notified when it is connected to the server.
var onConnect = function(frame) {
debug("connected to MQTT");
$('#connect').fadeOut({ duration: 'fast' });
$('#connected').fadeIn();
client.subscribe(destination, 2);
};
// this allows to display debug logs directly on the web page
var debug = function(str) {
$("#debug").append(document.createTextNode(str + "\n"));
};
$('#disconnect').click(function() {
client.disconnect();
$('#connected').fadeOut({ duration: 'fast' });
$('#connect').fadeIn();
$("#messages").html("")
return false;
});
$('#send_form').submit(function() {
var text = $('#send_form_input').val();
if (text) {
message = new Messaging.Message(text);
message.destinationName = '${info.topic1}';
message.qos = 2;
message.retained = true;
client.send(message);
$.ajax({
url: "pub.do",
type: "POST",
processDate: true,
data:{"topic":'${info.topic1}',"msg": text},
dataType: "json",
success: function () {
return;
}
});
$('#send_form_input').val("");
}
return false;
});
function onFailure(failure) {
debug("failure")
debug(failure.errorMessage);
}
function onMessageArrived(message) {
var p = document.createElement("p");
var t = document.createTextNode(message.payloadString);
p.appendChild(t);
$("#messages").append(p);
}
function onConnectionLost(responseObject) {
if (responseObject.errorCode !== 0) {
debug(client.clientId + ": " + responseObject.errorCode + "\n");
}
}
}
});
//]]></script>
</body>
</html>
大神们求原因和解决方法。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论