WildAssistant/python/mqtt_client.py
2025-08-18 02:28:52 +08:00

88 lines
2.2 KiB
Python

import json
import paho.mqtt.client as mqtt
from config import Config
from logger import get_log
# 创建一个 MQTT 客户端实例,并指定使用 V2 回调 API
# 这将消除 DeprecationWarning 警告
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
_log = get_log()
topics = [
"test/topic",
"test/topic2",
]
user_data = {
"topics": json.dumps(topics).encode("utf-8"),
}
client.user_data_set(user_data)
# 定义连接成功时的回调函数
def on_connect(client, userdata, flags, rc, properties):
"""
当客户端成功连接到 MQTT 代理时,这个函数会被调用。
rc (return code) 的值为 0 表示成功。
"""
if rc == 0:
_log.info("成功连接到 MQTT 服务器!")
# 订阅一个主题。
# 当连接成功后,客户端会自动重新订阅之前订阅的主题。
topic_list = json.loads(userdata["topics"].decode("utf-8"))
for topic in topic_list:
_log.info(f"订阅主题:{topic}")
client.subscribe(topic)
else:
_log.error(f"连接失败,返回码:{rc}")
# 定义接收到消息时的回调函数
def on_message(client, userdata, msg):
"""
当客户端接收到来自代理的消息时,这个函数会被调用。
消息内容在 msg.payload 中,需要解码成字符串。
"""
_log.info(f"收到消息来自主题:{msg.topic}")
_log.info(f"消息内容:{msg.payload.decode('utf-8')}")
# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 连接 MQTT
def mqtt_connect():
"""
"""
_config = Config()
# 设置连接参数
broker_address = _config.broker_address
port = _config.broker_port # MQTT 默认端口
_log.info(f"正在连接{broker_address}:{port}")
try:
client.connect(broker_address, port, 60)
client.loop_start()
except Exception as e:
_log.exception(e)
# 断开 MQTT 连接的函数
def mqtt_disconnect():
"""
停止 MQTT 客户端循环并断开连接
"""
_log.info("正在断开 MQTT 连接...")
client.loop_stop()
client.disconnect()
_log.info("MQTT 连接已断开。")