当我在Spring MQTT中发布到配置主题时,为什么会遇到错误?

发布于 2025-01-22 20:30:59 字数 7414 浏览 0 评论 0原文

我正在从我的春季启动后端应用程序中将消息发送到“配置”主题,

这是我的MQTT设置,

final String mqttServerAddress =
            String.format("ssl://%s:%s", options.mqttBridgeHostname, options.mqttBridgePort);

    // Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
    // Google Cloud IoT Core, it must be in the format below.
    final String mqttClientId =
            String.format(
                    "projects/%s/locations/%s/registries/%s/devices/%s",
                    options.projectId, options.cloudRegion, options.registryId, options.deviceId);

    MqttConnectOptions connectOptions = new MqttConnectOptions();
    // Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
    // explictly set this. If you don't set MQTT version, the server will immediately close its
    // connection to your device.
    connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

    Properties sslProps = new Properties();
    sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
    connectOptions.setSSLProperties(sslProps);

    // With Google Cloud IoT Core, the username field is ignored, however it must be set for the
    // Paho client library to send the password field. The password field is used to transmit a JWT
    // to authorize the device.
    connectOptions.setUserName(options.userName);

    DateTime iat = new DateTime();
    if ("RS256".equals(options.algorithm)) {
        connectOptions.setPassword(
                createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
        connectOptions.setPassword(
                createJwtEs(options.projectId, options.privateKeyFileEC).toCharArray());
    } else {
        throw new IllegalArgumentException(
                "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    // [START iot_mqtt_publish]
    // Create a client, and connect to the Google MQTT bridge.
    MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

    // Both connect and publish operations may fail. If they do, allow retries but with an
    // exponential backoff time period.
    long initialConnectIntervalMillis = 500L;
    long maxConnectIntervalMillis = 6000L;
    long maxConnectRetryTimeElapsedMillis = 900000L;
    float intervalMultiplier = 1.5f;

    long retryIntervalMs = initialConnectIntervalMillis;
    long totalRetryTimeMs = 0;

    while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
        try {
            client.connect(connectOptions);
        } catch (MqttException e) {
            int reason = e.getReasonCode();

            // If the connection is lost or if the server cannot be connected, allow retries, but with
            // exponential backoff.
            System.out.println("An error occurred: " + e.getMessage());
            if (reason == MqttException.REASON_CODE_CONNECTION_LOST
                    || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
                System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
                Thread.sleep(retryIntervalMs);
                totalRetryTimeMs += retryIntervalMs;
                retryIntervalMs *= intervalMultiplier;
                if (retryIntervalMs > maxConnectIntervalMillis) {
                    retryIntervalMs = maxConnectIntervalMillis;
                }
            } else {
                throw e;
            }
        }
    }

    attachCallback(client, options.deviceId);



    // The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
    // required to be in the format below. Note that this is not the same as the device registry's
    // Cloud Pub/Sub topic.
    String mqttTopic = String.format("/devices/%s/%s", options.deviceId, options.messageType);
    long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
    if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
        System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
        iat = new DateTime();
        if ("RS256".equals(options.algorithm)) {
            connectOptions.setPassword(
                    createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
        } else if ("ES256".equals(options.algorithm)) {
            connectOptions.setPassword(
                    createJwtEs(options.projectId, options.privateKeyFileEC).toCharArray());
        } else {
            throw new IllegalArgumentException(
                    "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
        }
        client.disconnect();
        client.connect(connectOptions);
        attachCallback(client, options.deviceId);
    }

        MqttMessage message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8.name()));
        message.setQos(1);
        client.publish(mqttTopic, message);

选项类

public class MqttExampleOptions {
String mqttBridgeHostname = "mqtt.googleapis.com";
short mqttBridgePort = 8883;
String projectId = 
String cloudRegion = "europe-west1";
String userName = "unused";
String registryId = <I don't want to show>
String gatewayId = <I don't want to show>
String algorithm = "RS256";
String command = "demo";
String deviceId = <I don't want to show>
String privateKeyFile = "rsa_private_pkcs8";
String privateKeyFileEC = "ec_private_pkcs8";
int numMessages = 100;
int tokenExpMins = 20;
String telemetryData = "Specify with -telemetry_data";
String messageType = "config";
int waitTime = 120
}

这是我尝试将消息发布到主题“ config”的

ERROR 12556 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[.[dispatcherServlet]      : Servlet.service() 
for servlet [dispatcherServlet] in context with path [/iot-admin] threw exception [Request processing failed; nested exception is Connection Lost (32109) - java.io.EOFException] with root cause

java.io.EOFException: null
    at java.base/java.io.DataInputStream.readByte(DataInputStream.java:273) ~[na:na]
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.5.jar:na]
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.5.jar:na]

,我收到此错误,这是我发送的消息

{
"Led": {
    "id": "e36b5877-2579-4db1-b595-0e06410bde11",
    "rgbColors": [{
        "id": "1488acfe-baa7-4de8-b4a2-4e01b9f89fc5",
        "configName": "Terminal",
        "rgbColor": [150, 150, 150]
    }, {
        "id": "b8ce6a35-4219-4dba-a8de-a9070f17f1d2",
        "configName": "PayZone",
        "rgbColor": [150, 150, 150]
    }, {
        "id": "bf62cef4-8e22-4804-a7d8-a0996bef392e",
        "configName": "PayfreeLogo",
        "rgbColor": [150, 150, 150]
    }, {
        "id": "c62d25a4-678b-4833-9123-fe3836863400",
        "configName": "BagDetection",
        "rgbColor": [200, 200, 200]
    }, {
        "id": "e19e1ff3-327e-4132-9661-073f853cf913",
        "configName": "PersonDetection",
        "rgbColor": [150, 150, 150]
    }]
}
}

,正确地将消息发送到配置主题而不遇到此错误?我可以将消息发送到“状态”主题,但不能将消息发送到“ config”主题。

I'm sending a message to "config" topic from my Spring boot backend application

Here's my mqtt setup

final String mqttServerAddress =
            String.format("ssl://%s:%s", options.mqttBridgeHostname, options.mqttBridgePort);

    // Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
    // Google Cloud IoT Core, it must be in the format below.
    final String mqttClientId =
            String.format(
                    "projects/%s/locations/%s/registries/%s/devices/%s",
                    options.projectId, options.cloudRegion, options.registryId, options.deviceId);

    MqttConnectOptions connectOptions = new MqttConnectOptions();
    // Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
    // explictly set this. If you don't set MQTT version, the server will immediately close its
    // connection to your device.
    connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

    Properties sslProps = new Properties();
    sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
    connectOptions.setSSLProperties(sslProps);

    // With Google Cloud IoT Core, the username field is ignored, however it must be set for the
    // Paho client library to send the password field. The password field is used to transmit a JWT
    // to authorize the device.
    connectOptions.setUserName(options.userName);

    DateTime iat = new DateTime();
    if ("RS256".equals(options.algorithm)) {
        connectOptions.setPassword(
                createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
        connectOptions.setPassword(
                createJwtEs(options.projectId, options.privateKeyFileEC).toCharArray());
    } else {
        throw new IllegalArgumentException(
                "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    // [START iot_mqtt_publish]
    // Create a client, and connect to the Google MQTT bridge.
    MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

    // Both connect and publish operations may fail. If they do, allow retries but with an
    // exponential backoff time period.
    long initialConnectIntervalMillis = 500L;
    long maxConnectIntervalMillis = 6000L;
    long maxConnectRetryTimeElapsedMillis = 900000L;
    float intervalMultiplier = 1.5f;

    long retryIntervalMs = initialConnectIntervalMillis;
    long totalRetryTimeMs = 0;

    while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
        try {
            client.connect(connectOptions);
        } catch (MqttException e) {
            int reason = e.getReasonCode();

            // If the connection is lost or if the server cannot be connected, allow retries, but with
            // exponential backoff.
            System.out.println("An error occurred: " + e.getMessage());
            if (reason == MqttException.REASON_CODE_CONNECTION_LOST
                    || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
                System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
                Thread.sleep(retryIntervalMs);
                totalRetryTimeMs += retryIntervalMs;
                retryIntervalMs *= intervalMultiplier;
                if (retryIntervalMs > maxConnectIntervalMillis) {
                    retryIntervalMs = maxConnectIntervalMillis;
                }
            } else {
                throw e;
            }
        }
    }

    attachCallback(client, options.deviceId);



    // The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
    // required to be in the format below. Note that this is not the same as the device registry's
    // Cloud Pub/Sub topic.
    String mqttTopic = String.format("/devices/%s/%s", options.deviceId, options.messageType);
    long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
    if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
        System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
        iat = new DateTime();
        if ("RS256".equals(options.algorithm)) {
            connectOptions.setPassword(
                    createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
        } else if ("ES256".equals(options.algorithm)) {
            connectOptions.setPassword(
                    createJwtEs(options.projectId, options.privateKeyFileEC).toCharArray());
        } else {
            throw new IllegalArgumentException(
                    "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
        }
        client.disconnect();
        client.connect(connectOptions);
        attachCallback(client, options.deviceId);
    }

        MqttMessage message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8.name()));
        message.setQos(1);
        client.publish(mqttTopic, message);

here's the options class

public class MqttExampleOptions {
String mqttBridgeHostname = "mqtt.googleapis.com";
short mqttBridgePort = 8883;
String projectId = 
String cloudRegion = "europe-west1";
String userName = "unused";
String registryId = <I don't want to show>
String gatewayId = <I don't want to show>
String algorithm = "RS256";
String command = "demo";
String deviceId = <I don't want to show>
String privateKeyFile = "rsa_private_pkcs8";
String privateKeyFileEC = "ec_private_pkcs8";
int numMessages = 100;
int tokenExpMins = 20;
String telemetryData = "Specify with -telemetry_data";
String messageType = "config";
int waitTime = 120
}

When I try to publish message to topic "config" I get this error

ERROR 12556 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[.[dispatcherServlet]      : Servlet.service() 
for servlet [dispatcherServlet] in context with path [/iot-admin] threw exception [Request processing failed; nested exception is Connection Lost (32109) - java.io.EOFException] with root cause

java.io.EOFException: null
    at java.base/java.io.DataInputStream.readByte(DataInputStream.java:273) ~[na:na]
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.5.jar:na]
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.5.jar:na]

this is the message I am sending

{
"Led": {
    "id": "e36b5877-2579-4db1-b595-0e06410bde11",
    "rgbColors": [{
        "id": "1488acfe-baa7-4de8-b4a2-4e01b9f89fc5",
        "configName": "Terminal",
        "rgbColor": [150, 150, 150]
    }, {
        "id": "b8ce6a35-4219-4dba-a8de-a9070f17f1d2",
        "configName": "PayZone",
        "rgbColor": [150, 150, 150]
    }, {
        "id": "bf62cef4-8e22-4804-a7d8-a0996bef392e",
        "configName": "PayfreeLogo",
        "rgbColor": [150, 150, 150]
    }, {
        "id": "c62d25a4-678b-4833-9123-fe3836863400",
        "configName": "BagDetection",
        "rgbColor": [200, 200, 200]
    }, {
        "id": "e19e1ff3-327e-4132-9661-073f853cf913",
        "configName": "PersonDetection",
        "rgbColor": [150, 150, 150]
    }]
}
}

How can I properly send a message to a config topic without getting this error? I am able to send message to "state" topic, but not to "config" topic.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文