概要:基本数据检索地址端口发送订阅主题请求参数检索。 发送的函数是:
# -*-coding:utf-8 -*-from multiprocessing import Processimport paho.mqtt.publish as publicfrom Data.data import *import paho .mqtt.subscribe as subscribefrom common.yaml_common_method import *导入时间,json,yaml,os"""获取基础数据yaml"""data = yaml_method_read("data")host = data["host"]#IP地址 prot = data["port"] #端口 pub_url = data["pub_url"]+"/register"#发送 sub_url = data["sub_url"]+"/register /response"#订阅"""主题请求参数yaml get"" "publish_yaml = yaml_method_read("发布")publish_msg = public_yaml["upload_service"]client_id = time.strftime("mq:test:%Y%m%d% H%M%S",time.localtime(time.time( ))) class Mqtt_method_info( ): def mqtt_publish(self): #提交的函数为 msg = public_msg print("-----------------[k4 ] ]------------Send_sub---[k4 ] ----------------[ k4] ]---------") print(pub_url) print("---[k4 ][ k4]---------------[ k4] ------- 发送消息 --------[ k4][ k4]--------------- - --") print(msg) msg = json.dumps(msg) public.single(pub_url, msg, qos=1, hostname=host, port=prot, client_id=client_id) print ( “[ k4] --------------- [ k4] [k4 ]------------End_send----------------[ k4]-------------") def mqtt_subscribe(self): #这是订阅的函数 print("-------------[ k4] ------------订阅主题--- [k4 ][ k4]--------------[k4 ][ k4] -----") print(sub_url) msg = subscribe.simple(sub_url, qos=1, hostname=host, port=prot, client_id=client_id ) msg_payload = json .loads (msg.payload.decode("utf-8")) print("-------[ k4]- ----------------- ][ k4] 回复消息 ------------- -[k4 ]- ]------------") print(msg_payload) yaml_method_write("mqtt_return_data",msg_payload) defimplement(self): pub = Process(target=Mqtt_method_info().mqtt_publish)#订阅线程 sub = Process(target=Mqtt_method_info().mqtt_subscribe)#订阅线程 sub.start() time.sleep(1) pub.start() sub.join () print ("执行完成")if __name__ == "__main__": Mqtt_method_info().implement()
我们来谈谈 python-mqtt 测试脚本:
Python 的 paho [ 只需安装 k4]如果失败,您很可能需要使用镜像站。
此代码将使用 yaml 读取发送的数据,如果您需要使用一种方法,您只需运行它,更改发送和订阅功能,并通过不同的文件自行管理数据。
评论前必须登录!
注册