import json import paho.mqtt.client as mqtt from config import Config from logger import get_log # 创建一个 MQTT 客户端实例,并指定使用 V2 回调 API # 这将消除 DeprecationWarning 警告 client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) _log = get_log() topics = [ "test/topic", "test/topic2", ] user_data = { "topics": json.dumps(topics).encode("utf-8"), } client.user_data_set(user_data) # 定义连接成功时的回调函数 def on_connect(client, userdata, flags, rc, properties): """ 当客户端成功连接到 MQTT 代理时,这个函数会被调用。 rc (return code) 的值为 0 表示成功。 """ if rc == 0: _log.info("成功连接到 MQTT 服务器!") # 订阅一个主题。 # 当连接成功后,客户端会自动重新订阅之前订阅的主题。 topic_list = json.loads(userdata["topics"].decode("utf-8")) for topic in topic_list: _log.info(f"订阅主题:{topic}") client.subscribe(topic) else: _log.error(f"连接失败,返回码:{rc}") # 定义接收到消息时的回调函数 def on_message(client, userdata, msg): """ 当客户端接收到来自代理的消息时,这个函数会被调用。 消息内容在 msg.payload 中,需要解码成字符串。 """ _log.info(f"收到消息来自主题:{msg.topic}") _log.info(f"消息内容:{msg.payload.decode('utf-8')}") # 绑定回调函数 client.on_connect = on_connect client.on_message = on_message # 连接 MQTT def mqtt_connect(): """ """ _config = Config() # 设置连接参数 broker_address = _config.broker_address port = _config.broker_port # MQTT 默认端口 _log.info(f"正在连接{broker_address}:{port}") try: client.connect(broker_address, port, 60) client.loop_start() except Exception as e: _log.exception(e) # 断开 MQTT 连接的函数 def mqtt_disconnect(): """ 停止 MQTT 客户端循环并断开连接 """ _log.info("正在断开 MQTT 连接...") client.loop_stop() client.disconnect() _log.info("MQTT 连接已断开。")