MQTT客户端基于传入消息条件发布

发布于 2025-01-23 22:35:22 字数 2597 浏览 3 评论 0原文

我正在使用Python Paho MQTT库和MQTT客户端移动应用程序进行test.mosquito.org服务器/经纪人的MQTT实验。

此基本脚本在下面连接到test.mosquitto Server,我可以在其中将消息从移动MQTT客户端应用发布到此脚本,并且该脚本也可以每20秒通过def Publish发布每20秒钟的Mobile应用程序发布测试消息(客户端):函数。

import random
import time
from paho.mqtt import client as mqtt_client


broker = 'test.mosquitto.org'
port = 1883 


# generate client ID with pub prefix randomly
client_id = "test_1"
topic_to_publish = f"laptop/publish"
topic_to_listen = f"mobile/publish"
topic_to_wildcard = f"testing/*"

username = ""
password = ""


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            client.subscribe(topic_to_listen)
            print(f"Connected to MQTT Broker on topic: {topic_to_wildcard}")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    client.on_connect = on_connect  # Define callback function for successful connection
    client.on_message = on_message  # Define callback function for receipt of a message
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(20)
        msg = f"hello from {client_id}: {msg_count}"
        result = client.publish(topic_to_publish, msg)
        
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send {msg} to topic {topic_to_publish}")
        else:
            print(f"Failed to send message to topic {topic_to_publish}")
        msg_count += 1



def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    print("Message received-> " + msg.topic + " " + str(msg.payload))



def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

有人可以给我一个提示,即如何修改def发布(client):函数不为,而循环将每20秒发出消息,但仅发布发布消息如果来自移动应用程序收到的消息等于字符串“ Zone Temps”

我是否可以从MAIN 运行函数以及从def Publish(client)中删除publish(client) : ?感谢任何提示非常感谢。我遇到的是,当我运行此修改后的版本时,我缺少某些内容,两者之间根本没有消息交换。

def on_message(client, userdata, msg):  
    print("Message received-> " + msg.topic + " " + str(msg.payload))
    
    if str(msg.payload) == "zone temps":
        publish(client,"avg=72.1;min=66.4;max=78.8")
        

def run():
    client = connect_mqtt()
    client.loop_start()


if __name__ == '__main__':
    run()

I am experimenting with mqtt with python paho mqtt library and a mqtt client mobile app with the test.mosquito.org server/broker.

This basic script works below connecting to the test.mosquitto server where I can publish a message from a mobile mqtt client app to this script and this script can also publish to the mobile app every 20 seconds a test message via the def publish(client): function.

import random
import time
from paho.mqtt import client as mqtt_client


broker = 'test.mosquitto.org'
port = 1883 


# generate client ID with pub prefix randomly
client_id = "test_1"
topic_to_publish = f"laptop/publish"
topic_to_listen = f"mobile/publish"
topic_to_wildcard = f"testing/*"

username = ""
password = ""


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            client.subscribe(topic_to_listen)
            print(f"Connected to MQTT Broker on topic: {topic_to_wildcard}")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    client.on_connect = on_connect  # Define callback function for successful connection
    client.on_message = on_message  # Define callback function for receipt of a message
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(20)
        msg = f"hello from {client_id}: {msg_count}"
        result = client.publish(topic_to_publish, msg)
        
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send {msg} to topic {topic_to_publish}")
        else:
            print(f"Failed to send message to topic {topic_to_publish}")
        msg_count += 1



def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    print("Message received-> " + msg.topic + " " + str(msg.payload))



def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

Can someone give me a tip on how to modify the def publish(client): function not to be a while loop that will fire off messages every 20 seconds but to only publish if the message from the mobile app received equals a string "zone temps"?

Am I on track at all removing the publish(client) from main run function as well as the while loop from def publish(client):? Thanks any tips greatly appreciated. What I am running into is I am missing something when I run this modified version there is no message exchange between at all.

def on_message(client, userdata, msg):  
    print("Message received-> " + msg.topic + " " + str(msg.payload))
    
    if str(msg.payload) == "zone temps":
        publish(client,"avg=72.1;min=66.4;max=78.8")
        

def run():
    client = connect_mqtt()
    client.loop_start()


if __name__ == '__main__':
    run()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

记忆で 2025-01-30 22:35:22

我也是初学者;但是生病创建一个变量是发布或侦听,例如:

phoneApplistener = 0

,以及

如果str(msg.payload)==“ Zone temps”:

当我打印时我的有效负载看起来像:

b'payload'

首先,您需要将有效载荷分开:

tempmsgholder = str(msg.payload).split(“'''”)

这样做。 tempmsgholder [1]是您的纯有效载荷。

如果tempmsgholder [1] ==“区域temps”:phoneApplistener = 1

phoneApplistener值使决定0是聆听的,则是1。在您的发布循环上,您设置此

phoneApplistener == 1:发布您的消息


import random
import time
import threading
from paho.mqtt import client as mqtt_client


class moduleDatas:
    broker = ('test.mosquitto.org')
    port = (1883)

    # generate client ID with pub prefix randomly
    client_id = "test_1"
    topic_to_publish = f"laptop/publish"
    topic_to_listen = f"mobile/publish"
    topic_to_wildcard = f"testing/*"

    username = ""
    password = ""

# Create clients object:
  # You can create mqtt client obj using same pattern. Client has different on_msg or ex. 
mqttClient_1 = mqtt_client.Client(moduleDatas.client_id) # You can create what ever you want to create a new thread

def mqttClientConnect():
    mqttClient_1.connect(moduleDatas.broker[0], moduleDatas.port[0])
    mqttClient_1.loop_start() # It creates daemon thread while your main thread running, this will handle your mqtt connection.

@mqttClient_1.connect_callback()
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"Connected to MQTT Broker on topic: {moduleDatas.topic_to_wildcard}")
    else:
        print("Failed to connect, return code %d\n", rc)

@mqttClient_1.publish_callback()
def on_publish(client, userdata, mid):
    print(mid) # If publish is success its return 1 || If mid = 1 publish success. || You can check your publish msg if it return failed try to send again or check your connection.

@mqttClient_1.message_callback()
def on_message(client, userdata, message):
    temp_str = str(message.payload).split("'")
    if temp_str[1] == "zone temps":
        msg = "hello world" # <-- Your message here. Some func return or simple texts
        mqttClient_1.publish(topic= moduleDatas.topic_to_publish, payload= msg, qos= 0)

def mqttClientSubscribe():
    mqttClient_1.subscribe(moduleDatas.topic_to_listen)

def threadMqttClient1():
    mqttClientConnect()
    mqttClientSubscribe()

def buildThreads():
    threads= []
    t = threading.Thread(target=threadMqttClient1(), daemon= True)
    threads.append(t)
    # You can create on same pattern and append threads list.
    for t in threads:
        t.start()
    while True: # this will your main thread, you can create an operation, ill go with just idling.
        pass

if __name__ == "__main__":
    buildThreads()

I m also beginner; but ill create a variable to is it publish or listening, like:

phoneAppListener = 0

and also

if str(msg.payload) == "zone temps":

when i print my payload it looks like:

b'payload'

firstly you need to split your payload like:

tempMsgHolder = str(msg.payload).split("'")

when you do this. tempMsgHolder[1] is your pure payload.

if tempMsgHolder[1] == "zone temps": phoneAppListener = 1

phoneAppListener value make the decision 0 is listen, 1 is publish. on your publish loop you set this

phoneAppListener == 1: publish your message


import random
import time
import threading
from paho.mqtt import client as mqtt_client


class moduleDatas:
    broker = ('test.mosquitto.org')
    port = (1883)

    # generate client ID with pub prefix randomly
    client_id = "test_1"
    topic_to_publish = f"laptop/publish"
    topic_to_listen = f"mobile/publish"
    topic_to_wildcard = f"testing/*"

    username = ""
    password = ""

# Create clients object:
  # You can create mqtt client obj using same pattern. Client has different on_msg or ex. 
mqttClient_1 = mqtt_client.Client(moduleDatas.client_id) # You can create what ever you want to create a new thread

def mqttClientConnect():
    mqttClient_1.connect(moduleDatas.broker[0], moduleDatas.port[0])
    mqttClient_1.loop_start() # It creates daemon thread while your main thread running, this will handle your mqtt connection.

@mqttClient_1.connect_callback()
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"Connected to MQTT Broker on topic: {moduleDatas.topic_to_wildcard}")
    else:
        print("Failed to connect, return code %d\n", rc)

@mqttClient_1.publish_callback()
def on_publish(client, userdata, mid):
    print(mid) # If publish is success its return 1 || If mid = 1 publish success. || You can check your publish msg if it return failed try to send again or check your connection.

@mqttClient_1.message_callback()
def on_message(client, userdata, message):
    temp_str = str(message.payload).split("'")
    if temp_str[1] == "zone temps":
        msg = "hello world" # <-- Your message here. Some func return or simple texts
        mqttClient_1.publish(topic= moduleDatas.topic_to_publish, payload= msg, qos= 0)

def mqttClientSubscribe():
    mqttClient_1.subscribe(moduleDatas.topic_to_listen)

def threadMqttClient1():
    mqttClientConnect()
    mqttClientSubscribe()

def buildThreads():
    threads= []
    t = threading.Thread(target=threadMqttClient1(), daemon= True)
    threads.append(t)
    # You can create on same pattern and append threads list.
    for t in threads:
        t.start()
    while True: # this will your main thread, you can create an operation, ill go with just idling.
        pass

if __name__ == "__main__":
    buildThreads()

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文