15 【实战】ThingsBoard 通过 MQTT RPC 控制继电器设备(完整教程)
# 摘要本文基于 ThingsBoard 平台,通过 MQTT RPC 实现继电器远程控制,详解 RPC 原理、与属性控制的区别及请求/返回格式。重点说明:`getValue` 返回布尔值供平台解析显示状态;`setValue` 仅需设备正常响应即判定成功,平台不解析返回的 JSON,其仅用于调试。文章含完整 Python 代码、配置步骤与测试方法,可直接用于继电器、灯光等物联网控制场景。
【实战】ThingsBoard 通过 MQTT RPC 控制继电器设备(完整教程)
1. 任务说明
本文基于 ThingsBoard 物联网平台,通过 MQTT RPC(远程过程调用) 实现对继电器设备的远程控制。
核心目标是解决物联网设备控制的核心问题:如何让平台精准控制设备,并确认设备的执行结果,同时理清以下关键疑问:
- 什么是 RPC?为什么控制类场景必须用 RPC 而非普通属性设置?
- RPC 与传统属性设置的本质区别是什么?
- ThingsBoard 中 RPC 的请求/返回格式如何设计?如何利用平台的灵活性适配自定义返回值?
- setValue 的返回值平台是否解析?如何判定指令执行成功?
2. 知识储备
2.1 什么是 RPC,为什么要使用 RPC
RPC(Remote Procedure Call)即远程过程调用,核心是:平台像调用本地函数一样,远程执行设备上的方法,并获取执行结果。
在继电器控制场景中,RPC 是最优选择,核心原因是:
RPC 能在应用层确认请求是否被设备真正执行,而非仅确认 MQTT 数据传输层是否送达。
具体优势:
- 双向交互:平台下发指令 → 设备执行 → 返回结果,形成完整闭环;
- 结果可追溯:明确知道设备「是否收到指令、是否执行成功、执行后状态」;
- 实时性强:请求-响应模式确保指令即时生效,适配继电器这类需要精准控制的场景;
- 可靠性高:基于 MQTT QoS=1 保障指令不丢失,应用层异常可直接返回失败原因。
2.2 RPC 方式和属性设置方式的核心区别
很多新手会疑惑:“既然属性也能下发参数,为什么还要用 RPC?” 用一句话总结:
平台下发属性是「我发了,你看着办」;RPC 是「我发指令,你必须告诉我做没做、做成什么样」。
| 对比维度 | RPC 远程调用 | 属性设置(客户端属性) |
|---|---|---|
| 交互模式 | 双向:请求 → 响应 | 单向:平台下发,无强制响应 |
| 结果确认 | 应用层明确返回执行结果(成功/失败/状态) | 仅确认数据传输送达,不保证设备执行 |
| 实时性 | 即时响应,控制类场景首选 | 依赖设备主动同步,实时性差 |
| 适用场景 | 继电器开关、电机启停、阀门控制 | 配置参数、固件版本、设备描述 |
对继电器这类需要精准控制、必须确认开关状态的设备,RPC能在应用层保障执行的确定性,而非仅停留在MQTT传输层的送达确认,因此是更合理的方案。
2.3 RPC 请求与返回数据格式
2.3.1 获取状态(getValue)

下面是我实际运行时的日志,可以清晰看到完整格式:
2026-03-01 22:23:06,467 - INFO - ├─ 请求主题:v1/devices/me/rpc/request/93
2026-03-01 22:23:06,467 - INFO - ├─ 请求ID:93
2026-03-01 22:23:06,467 - INFO - └─ 请求内容:{"method":"getValue","params":null}
2026-03-01 22:23:06,467 - INFO - 📤 回复getValue -> 继电器1状态:True
2026-03-01 22:23:06,467 - INFO - 📤 准备回复RPC请求:
2026-03-01 22:23:06,467 - INFO - ├─ 返回主题:v1/devices/me/rpc/response/93
2026-03-01 22:23:06,468 - INFO - └─ 返回内容:true
- 请求主题:
v1/devices/me/rpc/request/{请求ID} - 请求内容:固定 JSON 结构,
method为方法名,params为参数 - 返回主题:
v1/devices/me/rpc/response/{请求ID}(必须和请求 ID 对应) - 返回内容:本例直接返回
true/false,无 JSON 封装(匹配平台默认解析函数)

2.3.2 控制状态(setValue)

报文内容:
📩 收到RPC请求:
2026-03-01 22:54:47,213 - INFO - ├─ 请求主题:v1/devices/me/rpc/request/96
2026-03-01 22:54:47,213 - INFO - ├─ 请求ID:96
2026-03-01 22:54:47,213 - INFO - └─ 请求内容:{"method":"setValue","params":true}
2026-03-01 22:54:47,213 - INFO - ⚡ 【硬件执行】继电器1打开
2026-03-01 22:54:47,213 - INFO - 📤 准备回复RPC请求:
2026-03-01 22:54:47,213 - INFO - ├─ 回复主题:v1/devices/me/rpc/response/96
2026-03-01 22:54:47,213 - INFO - └─ 回复内容:{"status": "success", "message": "继电器1已打开", "relay_id": 1, "current_status": true}
-
这里是转换值函数,非解析返回值:将开关状态(
true/false)转为下发给设备的params: true; -
平台下发的请求格式:
{"method":"setValue","params":true} -
设备返回的响应格式(JSON,仅用于调试/日志):
{"status": "success", "message": "继电器1已打开", "relay_id": 1, "current_status": true}
2.3.3 核心关键点:平台对 setValue 的判定逻辑 (有点奇葩)
新手必看:平台不解析 setValue 返回的 JSON 内容,仅通过以下两点判定指令是否成功:
- 响应时效性:是否在超时时间(默认500ms)内收到响应;
- 响应主题合法性:返回主题的
request_id与请求完全一致。
只要满足以上两点,哪怕返回空字符串、错误JSON、乱码,平台都判定 setValue 成功;仅“超时未响应”或“request_id 不匹配”才判定失败。
getValue返回值:用于驱动开关组件显示,格式需与解析函数匹配;setValue返回值:仅用于开发者调试/日志,开关组件不解析,状态更新依赖后续getValue请求。
3. 任务实施
3.1 创建继电器产品

- 无需提前配置属性,产品仅用于设备归类;
- 设备的“可控制属性”由后续仪表盘开关组件定义。
3.2 创建继电器设备

- 新建设备并归属到继电器产品;
- 记录设备访问令牌(代码认证用)。
3.3 添加设备到仪表盘
这是 RPC 控制的核心步骤,只有绑定开关组件才能下发 RPC 指令:
- 新建仪表盘,添加“开关控件(Switch)”;
- 配置控件数据源为目标继电器设备;
- 设置 RPC 方法:
- 查询状态:
getValue - 下发控制:
setValue
- 查询状态:
- 保持解析/转换函数默认配置:
- 解析值函数:
return data;(解析getValue布尔值) - 转换值函数:
return value;(转换开关状态为 params)
- 解析值函数:
保存后即可在仪表盘控制继电器。
4. 客户端代码
import json
import logging
import time
import threading
from paho.mqtt import client as mqtt_client
from paho.mqtt.client import MQTTv311
# ===================== 继电器设备核心配置 ======================
MQTT_BROKER = "192.168.111.53"
MQTT_PORT = 2883
MQTT_USERNAME = "p78RQ9DiwYhXiU8ULlOW" # 继电器设备访问令牌
MQTT_PASSWORD = ""
CLIENT_ID = "python-mqtt-relay-client" # 继电器专属ClientID
# MQTT主题配置(仅保留核心)
RPC_REQUEST_SUB_TOPIC = "v1/devices/me/rpc/request/+" # 订阅平台RPC请求
RPC_RESPONSE_PUB_TOPIC = "v1/devices/me/rpc/response/" # 发布RPC响应
TELEMETRY_PUB_TOPIC = "v1/devices/me/telemetry" # 上报继电器状态
# 继电器全局状态(1号继电器,False=关,True=开)
RELAY_STATUS = {
1: False
}
# 全局定时器(周期性上报继电器状态)
report_timer = None
REPORT_INTERVAL = 10 # 上报间隔(秒)
# 日志配置
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("Relay-RPC-Client")
def connect_mqtt() -> mqtt_client.Client:
"""建立MQTT连接(适配继电器设备)"""
reconnect_count = 0
def on_connect(client, userdata, flags, rc, properties=None):
nonlocal reconnect_count
reconnect_count = 0
rc_msg = {
0: "连接成功",
1: "协议版本错误",
2: "客户端ID非法",
3: "服务器不可用",
4: "用户名/密码错误",
5: "未授权",
}
if rc == 0:
logger.info(f"✅ MQTT 3.1.1 连接成功({MQTT_BROKER}:{MQTT_PORT})")
# 打印所有核心主题配置
logger.info("📋 核心MQTT主题配置:")
logger.info(f" ├─ RPC请求订阅主题:{RPC_REQUEST_SUB_TOPIC}")
logger.info(f" ├─ RPC响应发布主题(前缀):{RPC_RESPONSE_PUB_TOPIC}")
logger.info(f" └─ 继电器状态上报主题:{TELEMETRY_PUB_TOPIC}")
# 连接成功后订阅RPC请求
subscribe_rpc(client)
# 启动继电器状态周期性上报
start_report_timer(client)
# 上报初始状态
publish_relay_status(client)
else:
logger.error(f"❌ 连接失败(rc={rc}):{rc_msg.get(rc, '未知错误')}")
def on_disconnect(client, userdata, rc, properties=None):
nonlocal reconnect_count
reconnect_count += 1
if rc == 0:
logger.info("🔌 主动断开连接")
else:
disconnect_msg = {
1: "协议错误",
2: "客户端ID重复",
3: "服务器不可用",
4: "用户名密码错误",
5: "未授权",
128: "订阅越权",
}
logger.warning(
f"⚠️ 被动断开(rc={rc}):{disconnect_msg.get(rc, '服务器主动断开')},第{reconnect_count}次重连..."
)
time.sleep(min(reconnect_count * 2, 10))
client = mqtt_client.Client(
client_id=CLIENT_ID,
callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1,
protocol=MQTTv311,
)
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.auto_reconnect = True
client.reconnect_delay_set(min_delay=2, max_delay=10)
try:
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
except Exception as e:
logger.error(f"❌ TCP连接失败:{str(e)}")
raise
return client
def control_relay(relay_id: int, is_on: bool) -> dict:
"""
继电器控制核心函数(适配平台布尔值参数)
:param relay_id: 继电器编号
:param is_on: True=打开,False=关闭
:return: 执行结果字典
"""
global RELAY_STATUS
# 参数校验
if relay_id not in RELAY_STATUS:
return {
"status": "failed",
"message": f"继电器{relay_id}不存在(仅支持:{list(RELAY_STATUS.keys())})"
}
# 硬件控制逻辑(模拟,可替换为实际硬件控制)
RELAY_STATUS[relay_id] = is_on
action = "打开" if is_on else "关闭"
result = {
"status": "success",
"message": f"继电器{relay_id}已{action}",
"relay_id": relay_id,
"current_status": is_on
}
logger.info(f"⚡ 【硬件执行】继电器{relay_id}{action}")
return result
def on_rpc_request(client, userdata, msg):
"""处理平台下发的继电器RPC控制请求(适配setValue/getValue)"""
# 提取RPC请求ID(回复必须携带)
request_id = msg.topic.split("/")[-1]
payload = msg.payload.decode("utf-8", errors="ignore")
# 打印收到请求的完整主题和内容
logger.info(f"\n📩 收到RPC请求:")
logger.info(f" ├─ 请求主题:{msg.topic}")
logger.info(f" ├─ 请求ID:{request_id}")
logger.info(f" └─ 请求内容:{payload}")
try:
rpc_request = json.loads(payload)
method = rpc_request.get("method")
params = rpc_request.get("params") # 不再默认给字典,保留原始类型
relay_id = 1 # 平台未传relay_id,固定控制1号继电器(可根据需求调整)
# 处理平台的setValue/getValue方法
if method == "setValue":
# params是布尔值:True=开,False=关
if not isinstance(params, bool):
result = {"status": "failed", "message": "setValue参数必须是布尔值(true/false)"}
else:
result = control_relay(relay_id, params)
elif method == "getValue":
# getValue无参数,返回当前继电器状态(平台期望直接返回布尔值)
current_status = RELAY_STATUS.get(relay_id, False)
# 注意:平台可能期望直接返回布尔值,而非字典,这里兼容两种格式
result = current_status
logger.info(f"📤 回复getValue -> 继电器{relay_id}状态:{current_status}")
else:
result = {
"status": "failed",
"message": f"不支持的RPC方法:{method},仅支持 setValue / getValue"
}
# 回复平台RPC请求(适配getValue直接返回布尔值的场景)
response_topic = f"{RPC_RESPONSE_PUB_TOPIC}{request_id}"
# 打印回复的主题
logger.info(f"📤 准备回复RPC请求:")
logger.info(f" ├─ 回复主题:{response_topic}")
# 如果是布尔值,直接转字符串;否则序列化JSON
if isinstance(result, bool):
payload = str(result).lower() # 转小写:true/false
logger.info(f" └─ 回复内容:{payload}")
else:
payload = json.dumps(result, ensure_ascii=False)
logger.info(f" └─ 回复内容:{payload}")
client.publish(response_topic, payload, qos=1)
except json.JSONDecodeError:
result = {"status": "failed", "message": "请求内容不是合法的JSON格式"}
response_topic = f"{RPC_RESPONSE_PUB_TOPIC}{request_id}"
logger.error(f"❌ RPC解析失败:")
logger.error(f" ├─ 请求主题:{msg.topic}")
logger.error(f" ├─ 请求内容:{payload}")
logger.error(f" └─ 回复主题:{response_topic}")
client.publish(response_topic, json.dumps(result), qos=1)
except Exception as e:
result = {"status": "failed", "message": f"处理RPC请求出错:{str(e)}"}
response_topic = f"{RPC_RESPONSE_PUB_TOPIC}{request_id}"
logger.error(f"❌ RPC处理异常:")
logger.error(f" ├─ 请求主题:{msg.topic}")
logger.error(f" ├─ 异常信息:{str(e)}")
logger.error(f" └─ 回复主题:{response_topic}")
client.publish(response_topic, json.dumps(result), qos=1)
logger.error(f"❌ RPC处理异常详情:{str(e)}", exc_info=True)
def subscribe_rpc(client: mqtt_client.Client):
"""订阅平台RPC请求主题"""
# 打印订阅操作
logger.info(f"\n📌 开始订阅RPC请求主题:")
logger.info(f" ├─ 订阅主题:{RPC_REQUEST_SUB_TOPIC}")
logger.info(f" └─ QoS级别:1")
client.subscribe(RPC_REQUEST_SUB_TOPIC, qos=1)
client.message_callback_add(RPC_REQUEST_SUB_TOPIC, on_rpc_request)
logger.info(f"✅ 成功订阅RPC请求主题:{RPC_REQUEST_SUB_TOPIC},等待平台控制指令...")
def publish_relay_status(client: mqtt_client.Client):
"""上报继电器状态到平台(用于仪表盘可视化)"""
if client.is_connected():
# 构造上报数据(仅包含继电器状态)
telemetry_data = {f"relay_{k}_status": v for k, v in RELAY_STATUS.items()}
telemetry_data["timestamp"] = int(time.time() * 1000) # 时间戳
payload = json.dumps(telemetry_data, ensure_ascii=False)
# 打印上报主题和数据
logger.info(f"\n📡 准备上报继电器状态:")
logger.info(f" ├─ 上报主题:{TELEMETRY_PUB_TOPIC}")
logger.info(f" └─ 上报数据:{telemetry_data}")
result = client.publish(TELEMETRY_PUB_TOPIC, payload, qos=1)
if result[0] == 0:
logger.info(f"✅ 上报继电器状态成功(状态码:{result[0]})")
else:
logger.error(f"❌ 上报继电器状态失败(状态码:{result[0]})")
else:
logger.warning(f"⚠️ MQTT未连接,跳过本次状态上报(目标主题:{TELEMETRY_PUB_TOPIC})")
def start_report_timer(client: mqtt_client.Client):
"""启动继电器状态周期性上报定时器"""
global report_timer
# 取消原有定时器避免重复
if report_timer is not None and report_timer.is_alive():
report_timer.cancel()
# 新建定时器
report_timer = threading.Timer(REPORT_INTERVAL, publish_relay_status, args=[client])
report_timer.daemon = True
report_timer.start()
logger.debug(f"⏰ 已设置下次状态上报定时器({REPORT_INTERVAL}秒后,上报主题:{TELEMETRY_PUB_TOPIC})")
def run():
"""主函数:启动继电器RPC控制客户端"""
logger.info("🚀 启动继电器MQTT RPC控制客户端(适配setValue/getValue)")
# 打印基础配置和主题
logger.info("🔧 客户端基础配置:")
logger.info(f" ├─ MQTT服务器:{MQTT_BROKER}:{MQTT_PORT}")
logger.info(f" ├─ 设备令牌:{MQTT_USERNAME}")
logger.info(f" ├─ Client ID:{CLIENT_ID}")
logger.info(f" └─ 上报间隔:{REPORT_INTERVAL}秒")
client = connect_mqtt()
try:
# 阻塞运行MQTT客户端,监听RPC请求
client.loop_forever()
except KeyboardInterrupt:
# 优雅退出
global report_timer
if report_timer is not None:
report_timer.cancel()
logger.info("\n🛑 程序退出,断开MQTT连接...")
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
try:
run()
except Exception as e:
logger.error(f"❌ 程序异常:{str(e)}", exc_info=True)
代码说明
- 基于 MQTT 3.1.1 接入 ThingsBoard,设备访问令牌认证;
- 订阅 RPC 请求主题,接收
setValue/getValue指令; setValue:接收布尔值控制继电器,返回 JSON 格式执行结果(仅调试用);getValue:返回布尔值状态,适配平台默认解析函数;- 支持断线重连、异常处理、状态周期性上报;
- 硬件控制为模拟逻辑,可直接对接实际硬件驱动。
5. 代码测试
- 修改代码中 MQTT 服务器地址与设备访问令牌;
- 运行程序,确认日志显示“MQTT 连接成功”“订阅 RPC 主题成功”;
- 在 ThingsBoard 仪表盘操作开关组件;
- 验证客户端日志:
- 收到
setValue/getValueRPC 请求; - 执行继电器控制逻辑;
- 回复正确格式的响应;
- 周期性上报继电器状态。
- 收到
正常运行后,可看到完整的“请求→处理→响应→上报”流程,与示例日志格式一致。

- 控制设备OK
- 第一次进到仪表盘界面自动获取状态,应该能正确显示开或者关,证明获取状态没有问题
对应log:

6. 任务总结
通过本次实践,完成了基于 ThingsBoard + MQTT RPC 的继电器远程控制,核心收获:
- 理解 RPC 核心价值:双向交互、结果可追溯,适配精准控制场景;
- 明确 RPC 与属性控制的区别:RPC 是“指令+反馈”,属性是“单向推送”;
- 掌握 RPC 格式设计:
getValue返回布尔值适配开关显示,setValue返回 JSON 仅用于调试;- 平台判定
setValue成功仅看“是否收到响应+request_id 匹配”,不解析 JSON;
- 实现完整流程:产品创建→设备配置→仪表盘绑定→代码开发→测试验证;
- 方案可直接复用:适配继电器、灯光、阀门、电机等远程控制场景。
这套实现是工业物联网、智能家居中远程控制类场景的典型方案,兼顾实时性、可靠性与可维护性。
附录:常见问题排查
| 问题现象 | 排查方向 |
|---|---|
| MQTT 连接失败(rc=4/5) | 设备访问令牌错误、设备未激活、服务器IP/端口错误 |
| RPC 请求无响应 | 未订阅 RPC 主题、method 名不匹配、request_id 回复错误 |
| 开关状态不更新 | getValue 返回格式与解析函数不匹配、网络超时 |
| setValue 判定失败 | 响应超时(设备处理过慢)、request_id 不一致 |
| 状态上报失败 | MQTT 未连接、上报主题错误、JSON 格式非法 |
更多推荐



所有评论(0)