【实战】ThingsBoard 通过 MQTT RPC 控制继电器设备(完整教程)

1. 任务说明

本文基于 ThingsBoard 物联网平台,通过 MQTT RPC(远程过程调用) 实现对继电器设备的远程控制。
核心目标是解决物联网设备控制的核心问题:如何让平台精准控制设备,并确认设备的执行结果,同时理清以下关键疑问:

  • 什么是 RPC?为什么控制类场景必须用 RPC 而非普通属性设置?
  • RPC 与传统属性设置的本质区别是什么?
  • ThingsBoard 中 RPC 的请求/返回格式如何设计?如何利用平台的灵活性适配自定义返回值?
  • setValue 的返回值平台是否解析?如何判定指令执行成功?

2. 知识储备

2.1 什么是 RPC,为什么要使用 RPC

RPC(Remote Procedure Call)即远程过程调用,核心是:平台像调用本地函数一样,远程执行设备上的方法,并获取执行结果

在继电器控制场景中,RPC 是最优选择,核心原因是:

RPC 能在应用层确认请求是否被设备真正执行,而非仅确认 MQTT 数据传输层是否送达

具体优势:

  • 双向交互:平台下发指令 → 设备执行 → 返回结果,形成完整闭环;
  • 结果可追溯:明确知道设备「是否收到指令、是否执行成功、执行后状态」;
  • 实时性强:请求-响应模式确保指令即时生效,适配继电器这类需要精准控制的场景;
  • 可靠性高:基于 MQTT QoS=1 保障指令不丢失,应用层异常可直接返回失败原因。

2.2 RPC 方式和属性设置方式的核心区别

很多新手会疑惑:“既然属性也能下发参数,为什么还要用 RPC?” 用一句话总结:

平台下发属性是「我发了,你看着办」;RPC 是「我发指令,你必须告诉我做没做、做成什么样」

对比维度 RPC 远程调用 属性设置(客户端属性)
交互模式 双向:请求 → 响应 单向:平台下发,无强制响应
结果确认 应用层明确返回执行结果(成功/失败/状态) 仅确认数据传输送达,不保证设备执行
实时性 即时响应,控制类场景首选 依赖设备主动同步,实时性差
适用场景 继电器开关、电机启停、阀门控制 配置参数、固件版本、设备描述

对继电器这类需要精准控制、必须确认开关状态的设备,RPC能在应用层保障执行的确定性,而非仅停留在MQTT传输层的送达确认,因此是更合理的方案。

2.3 RPC 请求与返回数据格式

2.3.1 获取状态(getValue)

在这里插入图片描述

下面是我实际运行时的日志,可以清晰看到完整格式:

2026-03-01 22:23:06,467 - INFO -    ├─ 请求主题:v1/devices/me/rpc/request/93
2026-03-01 22:23:06,467 - INFO -    ├─ 请求ID:93
2026-03-01 22:23:06,467 - INFO -    └─ 请求内容:{"method":"getValue","params":null}
2026-03-01 22:23:06,467 - INFO - 📤 回复getValue -> 继电器1状态:True
2026-03-01 22:23:06,467 - INFO - 📤 准备回复RPC请求:
2026-03-01 22:23:06,467 - INFO -    ├─ 返回主题:v1/devices/me/rpc/response/93
2026-03-01 22:23:06,468 - INFO -    └─ 返回内容:true
  • 请求主题:v1/devices/me/rpc/request/{请求ID}
  • 请求内容:固定 JSON 结构,method 为方法名,params 为参数
  • 返回主题:v1/devices/me/rpc/response/{请求ID}(必须和请求 ID 对应)
  • 返回内容:本例直接返回 true/false,无 JSON 封装(匹配平台默认解析函数)

在这里插入图片描述

2.3.2 控制状态(setValue)

在这里插入图片描述

报文内容:

📩 收到RPC请求:
2026-03-01 22:54:47,213 - INFO -    ├─ 请求主题:v1/devices/me/rpc/request/96
2026-03-01 22:54:47,213 - INFO -    ├─ 请求ID:96
2026-03-01 22:54:47,213 - INFO -    └─ 请求内容:{"method":"setValue","params":true}
2026-03-01 22:54:47,213 - INFO - ⚡ 【硬件执行】继电器1打开
2026-03-01 22:54:47,213 - INFO - 📤 准备回复RPC请求:
2026-03-01 22:54:47,213 - INFO -    ├─ 回复主题:v1/devices/me/rpc/response/96
2026-03-01 22:54:47,213 - INFO -    └─ 回复内容:{"status": "success", "message": "继电器1已打开", "relay_id": 1, "current_status": true}
  • 这里是转换值函数,非解析返回值:将开关状态(true/false)转为下发给设备的 params: true

  • 平台下发的请求格式:

    {"method":"setValue","params":true}
    
  • 设备返回的响应格式(JSON,仅用于调试/日志):

    {"status": "success", "message": "继电器1已打开", "relay_id": 1, "current_status": true}
    
2.3.3 核心关键点:平台对 setValue 的判定逻辑 (有点奇葩

新手必看:平台不解析 setValue 返回的 JSON 内容,仅通过以下两点判定指令是否成功:

  1. 响应时效性:是否在超时时间(默认500ms)内收到响应;
  2. 响应主题合法性:返回主题的 request_id 与请求完全一致。

只要满足以上两点,哪怕返回空字符串、错误JSON、乱码,平台都判定 setValue 成功;仅“超时未响应”或“request_id 不匹配”才判定失败。

  • getValue 返回值:用于驱动开关组件显示,格式需与解析函数匹配;
  • setValue 返回值:仅用于开发者调试/日志,开关组件不解析,状态更新依赖后续 getValue 请求。

3. 任务实施

3.1 创建继电器产品

在这里插入图片描述

  • 无需提前配置属性,产品仅用于设备归类;
  • 设备的“可控制属性”由后续仪表盘开关组件定义。

3.2 创建继电器设备

在这里插入图片描述

  • 新建设备并归属到继电器产品;
  • 记录设备访问令牌(代码认证用)。

3.3 添加设备到仪表盘

这是 RPC 控制的核心步骤,只有绑定开关组件才能下发 RPC 指令:

  1. 新建仪表盘,添加“开关控件(Switch)”;
  2. 配置控件数据源为目标继电器设备;
  3. 设置 RPC 方法:
    • 查询状态:getValue
    • 下发控制:setValue
  4. 保持解析/转换函数默认配置:
    • 解析值函数:return data;(解析 getValue 布尔值)
    • 转换值函数:return value;(转换开关状态为 params)

保存后即可在仪表盘控制继电器。


4. 客户端代码

import json
import logging
import time
import threading
from paho.mqtt import client as mqtt_client
from paho.mqtt.client import MQTTv311

# ===================== 继电器设备核心配置 ======================
MQTT_BROKER = "192.168.111.53"
MQTT_PORT = 2883
MQTT_USERNAME = "p78RQ9DiwYhXiU8ULlOW"  # 继电器设备访问令牌
MQTT_PASSWORD = ""
CLIENT_ID = "python-mqtt-relay-client"  # 继电器专属ClientID

# MQTT主题配置(仅保留核心)
RPC_REQUEST_SUB_TOPIC = "v1/devices/me/rpc/request/+"  # 订阅平台RPC请求
RPC_RESPONSE_PUB_TOPIC = "v1/devices/me/rpc/response/"  # 发布RPC响应
TELEMETRY_PUB_TOPIC = "v1/devices/me/telemetry"        # 上报继电器状态

# 继电器全局状态(1号继电器,False=关,True=开)
RELAY_STATUS = {
    1: False
}

# 全局定时器(周期性上报继电器状态)
report_timer = None
REPORT_INTERVAL = 10  # 上报间隔(秒)

# 日志配置
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("Relay-RPC-Client")


def connect_mqtt() -> mqtt_client.Client:
    """建立MQTT连接(适配继电器设备)"""
    reconnect_count = 0

    def on_connect(client, userdata, flags, rc, properties=None):
        nonlocal reconnect_count
        reconnect_count = 0
        rc_msg = {
            0: "连接成功",
            1: "协议版本错误",
            2: "客户端ID非法",
            3: "服务器不可用",
            4: "用户名/密码错误",
            5: "未授权",
        }
        if rc == 0:
            logger.info(f"✅ MQTT 3.1.1 连接成功({MQTT_BROKER}:{MQTT_PORT})")
            # 打印所有核心主题配置
            logger.info("📋 核心MQTT主题配置:")
            logger.info(f"   ├─ RPC请求订阅主题:{RPC_REQUEST_SUB_TOPIC}")
            logger.info(f"   ├─ RPC响应发布主题(前缀):{RPC_RESPONSE_PUB_TOPIC}")
            logger.info(f"   └─ 继电器状态上报主题:{TELEMETRY_PUB_TOPIC}")
            # 连接成功后订阅RPC请求
            subscribe_rpc(client)
            # 启动继电器状态周期性上报
            start_report_timer(client)
            # 上报初始状态
            publish_relay_status(client)
        else:
            logger.error(f"❌ 连接失败(rc={rc}):{rc_msg.get(rc, '未知错误')}")

    def on_disconnect(client, userdata, rc, properties=None):
        nonlocal reconnect_count
        reconnect_count += 1
        if rc == 0:
            logger.info("🔌 主动断开连接")
        else:
            disconnect_msg = {
                1: "协议错误",
                2: "客户端ID重复",
                3: "服务器不可用",
                4: "用户名密码错误",
                5: "未授权",
                128: "订阅越权",
            }
            logger.warning(
                f"⚠️  被动断开(rc={rc}):{disconnect_msg.get(rc, '服务器主动断开')},第{reconnect_count}次重连..."
            )
            time.sleep(min(reconnect_count * 2, 10))

    client = mqtt_client.Client(
        client_id=CLIENT_ID,
        callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1,
        protocol=MQTTv311,
    )
    client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.auto_reconnect = True
    client.reconnect_delay_set(min_delay=2, max_delay=10)

    try:
        client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
    except Exception as e:
        logger.error(f"❌ TCP连接失败:{str(e)}")
        raise
    return client


def control_relay(relay_id: int, is_on: bool) -> dict:
    """
    继电器控制核心函数(适配平台布尔值参数)
    :param relay_id: 继电器编号
    :param is_on: True=打开,False=关闭
    :return: 执行结果字典
    """
    global RELAY_STATUS
    # 参数校验
    if relay_id not in RELAY_STATUS:
        return {
            "status": "failed",
            "message": f"继电器{relay_id}不存在(仅支持:{list(RELAY_STATUS.keys())})"
        }
    
    # 硬件控制逻辑(模拟,可替换为实际硬件控制)
    RELAY_STATUS[relay_id] = is_on
    action = "打开" if is_on else "关闭"
    result = {
        "status": "success",
        "message": f"继电器{relay_id}{action}",
        "relay_id": relay_id,
        "current_status": is_on
    }
    logger.info(f"⚡ 【硬件执行】继电器{relay_id}{action}")
    
    return result


def on_rpc_request(client, userdata, msg):
    """处理平台下发的继电器RPC控制请求(适配setValue/getValue)"""
    # 提取RPC请求ID(回复必须携带)
    request_id = msg.topic.split("/")[-1]
    payload = msg.payload.decode("utf-8", errors="ignore")
    # 打印收到请求的完整主题和内容
    logger.info(f"\n📩 收到RPC请求:")
    logger.info(f"   ├─ 请求主题:{msg.topic}")
    logger.info(f"   ├─ 请求ID:{request_id}")
    logger.info(f"   └─ 请求内容:{payload}")

    try:
        rpc_request = json.loads(payload)
        method = rpc_request.get("method")
        params = rpc_request.get("params")  # 不再默认给字典,保留原始类型
        relay_id = 1  # 平台未传relay_id,固定控制1号继电器(可根据需求调整)

        # 处理平台的setValue/getValue方法
        if method == "setValue":
            # params是布尔值:True=开,False=关
            if not isinstance(params, bool):
                result = {"status": "failed", "message": "setValue参数必须是布尔值(true/false)"}
            else:
                result = control_relay(relay_id, params)
        elif method == "getValue":
            # getValue无参数,返回当前继电器状态(平台期望直接返回布尔值)
            current_status = RELAY_STATUS.get(relay_id, False)
            # 注意:平台可能期望直接返回布尔值,而非字典,这里兼容两种格式
            result = current_status
            logger.info(f"📤 回复getValue -> 继电器{relay_id}状态:{current_status}")
        else:
            result = {
                "status": "failed",
                "message": f"不支持的RPC方法:{method},仅支持 setValue / getValue"
            }

        # 回复平台RPC请求(适配getValue直接返回布尔值的场景)
        response_topic = f"{RPC_RESPONSE_PUB_TOPIC}{request_id}"
        # 打印回复的主题
        logger.info(f"📤 准备回复RPC请求:")
        logger.info(f"   ├─ 回复主题:{response_topic}")
        # 如果是布尔值,直接转字符串;否则序列化JSON
        if isinstance(result, bool):
            payload = str(result).lower()  # 转小写:true/false
            logger.info(f"   └─ 回复内容:{payload}")
        else:
            payload = json.dumps(result, ensure_ascii=False)
            logger.info(f"   └─ 回复内容:{payload}")
        client.publish(response_topic, payload, qos=1)

    except json.JSONDecodeError:
        result = {"status": "failed", "message": "请求内容不是合法的JSON格式"}
        response_topic = f"{RPC_RESPONSE_PUB_TOPIC}{request_id}"
        logger.error(f"❌ RPC解析失败:")
        logger.error(f"   ├─ 请求主题:{msg.topic}")
        logger.error(f"   ├─ 请求内容:{payload}")
        logger.error(f"   └─ 回复主题:{response_topic}")
        client.publish(response_topic, json.dumps(result), qos=1)
    except Exception as e:
        result = {"status": "failed", "message": f"处理RPC请求出错:{str(e)}"}
        response_topic = f"{RPC_RESPONSE_PUB_TOPIC}{request_id}"
        logger.error(f"❌ RPC处理异常:")
        logger.error(f"   ├─ 请求主题:{msg.topic}")
        logger.error(f"   ├─ 异常信息:{str(e)}")
        logger.error(f"   └─ 回复主题:{response_topic}")
        client.publish(response_topic, json.dumps(result), qos=1)
        logger.error(f"❌ RPC处理异常详情:{str(e)}", exc_info=True)


def subscribe_rpc(client: mqtt_client.Client):
    """订阅平台RPC请求主题"""
    # 打印订阅操作
    logger.info(f"\n📌 开始订阅RPC请求主题:")
    logger.info(f"   ├─ 订阅主题:{RPC_REQUEST_SUB_TOPIC}")
    logger.info(f"   └─ QoS级别:1")
    client.subscribe(RPC_REQUEST_SUB_TOPIC, qos=1)
    client.message_callback_add(RPC_REQUEST_SUB_TOPIC, on_rpc_request)
    logger.info(f"✅ 成功订阅RPC请求主题:{RPC_REQUEST_SUB_TOPIC},等待平台控制指令...")


def publish_relay_status(client: mqtt_client.Client):
    """上报继电器状态到平台(用于仪表盘可视化)"""
    if client.is_connected():
        # 构造上报数据(仅包含继电器状态)
        telemetry_data = {f"relay_{k}_status": v for k, v in RELAY_STATUS.items()}
        telemetry_data["timestamp"] = int(time.time() * 1000)  # 时间戳
        payload = json.dumps(telemetry_data, ensure_ascii=False)
        
        # 打印上报主题和数据
        logger.info(f"\n📡 准备上报继电器状态:")
        logger.info(f"   ├─ 上报主题:{TELEMETRY_PUB_TOPIC}")
        logger.info(f"   └─ 上报数据:{telemetry_data}")
        
        result = client.publish(TELEMETRY_PUB_TOPIC, payload, qos=1)
        if result[0] == 0:
            logger.info(f"✅ 上报继电器状态成功(状态码:{result[0]})")
        else:
            logger.error(f"❌ 上报继电器状态失败(状态码:{result[0]})")
    else:
        logger.warning(f"⚠️ MQTT未连接,跳过本次状态上报(目标主题:{TELEMETRY_PUB_TOPIC})")


def start_report_timer(client: mqtt_client.Client):
    """启动继电器状态周期性上报定时器"""
    global report_timer
    # 取消原有定时器避免重复
    if report_timer is not None and report_timer.is_alive():
        report_timer.cancel()
    # 新建定时器
    report_timer = threading.Timer(REPORT_INTERVAL, publish_relay_status, args=[client])
    report_timer.daemon = True
    report_timer.start()
    logger.debug(f"⏰ 已设置下次状态上报定时器({REPORT_INTERVAL}秒后,上报主题:{TELEMETRY_PUB_TOPIC})")


def run():
    """主函数:启动继电器RPC控制客户端"""
    logger.info("🚀 启动继电器MQTT RPC控制客户端(适配setValue/getValue)")
    # 打印基础配置和主题
    logger.info("🔧 客户端基础配置:")
    logger.info(f"   ├─ MQTT服务器:{MQTT_BROKER}:{MQTT_PORT}")
    logger.info(f"   ├─ 设备令牌:{MQTT_USERNAME}")
    logger.info(f"   ├─ Client ID:{CLIENT_ID}")
    logger.info(f"   └─ 上报间隔:{REPORT_INTERVAL}秒")
    client = connect_mqtt()
    
    try:
        # 阻塞运行MQTT客户端,监听RPC请求
        client.loop_forever()
    except KeyboardInterrupt:
        # 优雅退出
        global report_timer
        if report_timer is not None:
            report_timer.cancel()
        logger.info("\n🛑 程序退出,断开MQTT连接...")
        client.loop_stop()
        client.disconnect()


if __name__ == "__main__":
    try:
        run()
    except Exception as e:
        logger.error(f"❌ 程序异常:{str(e)}", exc_info=True)

代码说明

  • 基于 MQTT 3.1.1 接入 ThingsBoard,设备访问令牌认证;
  • 订阅 RPC 请求主题,接收 setValue/getValue 指令;
  • setValue:接收布尔值控制继电器,返回 JSON 格式执行结果(仅调试用);
  • getValue:返回布尔值状态,适配平台默认解析函数;
  • 支持断线重连、异常处理、状态周期性上报;
  • 硬件控制为模拟逻辑,可直接对接实际硬件驱动。

5. 代码测试

  1. 修改代码中 MQTT 服务器地址与设备访问令牌;
  2. 运行程序,确认日志显示“MQTT 连接成功”“订阅 RPC 主题成功”;
  3. 在 ThingsBoard 仪表盘操作开关组件;
  4. 验证客户端日志:
    • 收到 setValue/getValue RPC 请求;
    • 执行继电器控制逻辑;
    • 回复正确格式的响应;
    • 周期性上报继电器状态。

正常运行后,可看到完整的“请求→处理→响应→上报”流程,与示例日志格式一致。

在这里插入图片描述

  1. 控制设备OK
  2. 第一次进到仪表盘界面自动获取状态,应该能正确显示开或者关,证明获取状态没有问题

对应log:

在这里插入图片描述


6. 任务总结

通过本次实践,完成了基于 ThingsBoard + MQTT RPC 的继电器远程控制,核心收获:

  1. 理解 RPC 核心价值:双向交互、结果可追溯,适配精准控制场景;
  2. 明确 RPC 与属性控制的区别:RPC 是“指令+反馈”,属性是“单向推送”;
  3. 掌握 RPC 格式设计:
    • getValue 返回布尔值适配开关显示,setValue 返回 JSON 仅用于调试;
    • 平台判定 setValue 成功仅看“是否收到响应+request_id 匹配”,不解析 JSON;
  4. 实现完整流程:产品创建→设备配置→仪表盘绑定→代码开发→测试验证;
  5. 方案可直接复用:适配继电器、灯光、阀门、电机等远程控制场景。

这套实现是工业物联网、智能家居中远程控制类场景的典型方案,兼顾实时性、可靠性与可维护性。


附录:常见问题排查

问题现象 排查方向
MQTT 连接失败(rc=4/5) 设备访问令牌错误、设备未激活、服务器IP/端口错误
RPC 请求无响应 未订阅 RPC 主题、method 名不匹配、request_id 回复错误
开关状态不更新 getValue 返回格式与解析函数不匹配、网络超时
setValue 判定失败 响应超时(设备处理过慢)、request_id 不一致
状态上报失败 MQTT 未连接、上报主题错误、JSON 格式非法
Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐