mqttclient 消息可以发出,但是订阅收不到。

发布于 2022-09-02 15:09:20 字数 3814 浏览 30 评论 0

package Ui
import Trait.mqttConnection
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import scalafx.application.JFXApp
import scalafx.application.JFXApp.PrimaryStage
import scalafx.geometry.Insets
import scalafx.scene.Scene
import scalafx.scene.control.{Button, TextField}
import scalafx.scene.input.MouseEvent
import scalafx.scene.layout.{HBox, VBox}
import scalafx.scene.paint.Color._
import scalafx.scene.paint.{LinearGradient, Stops}
import scalafx.scene.text.Text
import scalafx.Includes._

object uiMain extends JFXApp with mqttConnection {

  def render(): Unit = {
    val sendTo = new TextField {
      promptText = "to:"
    }
    val msgContent = new TextField {
      promptText = "msg:"
    }
    val bt_send = new Button {
      text = "send"
    }
    bt_send.onMouseClicked = (event: MouseEvent) => {
      if (sendTo.text.equals("") || msgContent.text().equals("")) {

      } else {
        msgTest(sendTo.text(), msgContent.text())
      }
    }
    stage = new PrimaryStage {
      title = "lebanIm"
      width = 400
      height = 600
      scene = new Scene {
        fill = new LinearGradient(
          endX = 0,
          stops = Stops(DodgerBlue, DodgerBlue)
        )
        content = new VBox() {
          padding = Insets(5)
          children = List(
            sendTo, msgContent, bt_send
          )
        }
      }
    }
  }
}

以上为UI部分,可忽略。
下面是问题代码。


case class msgTest(to: String, msg: String) {
  println("msgTest---------------------------------------------")
  val topic: String = to
  val content: String = msg
  val qos: Int = 1
  val broker: String = "tcp://0.0.0.0:61613"
  val clientId: String = "admin"
  val persistence: MemoryPersistence = new MemoryPersistence()

  val sampleClient: MqttClient = new MqttClient(broker, clientId, persistence)
  val connOpts: MqttConnectOptions = new MqttConnectOptions()
  connOpts.setUserName("admin")
  connOpts.setCleanSession(false)
  //设置会话心跳时间
  connOpts.setKeepAliveInterval(1);
  val pwd = "password".toCharArray
  connOpts.setPassword(pwd)
  println("Connecting to broker: " + broker)
  sampleClient.setCallback(new callBack)
  sampleClient.connect(connOpts)
  println("Connected to broker: " + broker)
  listen()
  //Thread.sleep(1000)
  send()
  //Thread.sleep(1000)
  listen()

  def send(): Unit = {
    try {

      val message: MqttMessage = new MqttMessage(content.getBytes)
      message.setQos(qos)
      sampleClient.publish(topic, message)
      println("Published to topic: " + topic + " ,msg: " + msg)

    } catch {
      case me: MqttException => {
        println("reason " + me.getReasonCode())
        println("msg " + me.getMessage())
        println("loc " + me.getLocalizedMessage())
        println("cause " + me.getCause())
        println("excep " + me)
        me.printStackTrace()
      }
    }
  }
  def listen(): Unit = {
    try {
      println("Listening to topic 'msg/user/admin'...")
      sampleClient.subscribe("msg/user/admin",0)

    } catch {
      case me: MqttException => {
        println("reason " + me.getReasonCode())
        println("msg " + me.getMessage())
        println("loc " + me.getLocalizedMessage())
        println("cause " + me.getCause())
        println("excep " + me)
        me.printStackTrace()
      }
    }
  }
}

callback部分

class callBack extends MqttCallback {
  override def deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken): Unit = println(iMqttDeliveryToken)

  override def messageArrived(s: String, mqttMessage: MqttMessage): Unit = {
    println("messageArrived...." + mqttMessage.toString)
  }

  override def connectionLost(throwable: Throwable): Unit = println("lost connection")
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文