MQTT无法接收消息

发布于 2022-09-01 07:14:52 字数 4321 浏览 14 评论 0

最近在学习android推送的实现,Client用的paho,broker用的Apollo,在写测试demo时出现了以下情况:

  1. 在app中subscribe主题topic1,broker中没有topic1,则后台publish消息到topic1后,app的connectionLost方法被回调,而messageArrived并没有被回调;
  2. 在app中subscribe主题topic1,broker中存在topic1,则app的MqttCallback中的messageArrived方法被回调。如果此时在后台再次publish消息到topic1,则app的connectionLost被回调。

可能我的表达不够清晰,请看程序代码,请问我的代码有什么问题。

Android代码

MainActivity.java:
public class MainActivity extends Activity {
    private TextView tv;
    private Handler mhandler = new Handler() {
        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            String strContent = tv.getText().toString();
            strContent += "\n" + msg.getData().getString("content");
            //每接收一次消息,将消息内容追加到textview中显示
            tv.setText(strContent);
        }
    };

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        tv = (TextView) findViewById(R.id.tv);
        try {
            Subscribe.doTest(mhandler);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Subscribe.java:
public class Subscribe {
    private final static String userName = "admin";
    private final static String passWord = "password";
    private final static String HOST = "tcp://10.0.2.2:61613";
    private final static String TOPIC = "t1";
    private static MqttClient client;

    public static void doTest(final Handler mhandler) throws Exception {
        client = new MqttClient(HOST, "java_client", new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        client.setCallback(new MqttCallback() {

            @Override
            public void messageArrived(String arg0, MqttMessage arg1)
                    throws Exception {
                Message msg = Message.obtain();
                Bundle bundle = new Bundle();
                bundle.putString("content", arg1.toString());
                msg.setData(bundle);
                mhandler.sendMessage(msg);
                System.out.println("Client messageArrived");
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken arg0) {
                System.out.println("Client deliveryComplete");
            }

            @Override
            public void connectionLost(Throwable arg0) {
                System.out.println("Client connectionLost");
                if (!client.isConnected()) {
                    try {
                        client.connect();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        client.connect(options);
        client.subscribe(TOPIC);
    }
}

后台代码:

PubTest.java
public class PubTest {
    private static String userName = "admin";
    private static String passWord = "password";
    private static String HOST = "tcp://127.0.0.1:61613";
    private static MqttClient client;

    public static void main(String[] args) {
        try {
            client = new MqttClient(HOST, "java_client",
                    new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
            MqttTopic topic = client.getTopic("t1");
            MqttMessage message = new MqttMessage("m1".getBytes());
            message.setQos(1);
            message.setRetained(true);
            client.connect(options);
            MqttDeliveryToken token = topic.publish(message);
            while (!token.isComplete()) {
                token.waitForCompletion();
            }
            client.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

夜唯美灬不弃 2022-09-08 07:14:52

问题已经解决。

摇划花蜜的午后 2022-09-08 07:14:52

怎么解决的,我也碰到相同的问题了

所有深爱都是秘密 2022-09-08 07:14:52

你好!我是最近开始接触MQTT,然后自己私下写的demo跑起来总是有问题,方便发一个完整的DEMO给我看看吗?我的邮箱是1104567217@qq.com,服务器是Java开发的

你曾走过我的故事 2022-09-08 07:14:52

同样的问题,请问你是怎么解决的?

别理我 2022-09-08 07:14:52

先启动订阅进程,再启动发布进程,订阅进程才可以收到消息

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文