概述
MQTT 设备在出厂时通常仅指定 MQTT 服务器的接入地址,但没有指定其所订阅的 MQTT Topic。云上的控制服务(通常是 Java/Go/C++)希望通过根据指定的设备id来实现指令的下发。如 某个Java实现的后端服务希望向 clientid 为 vin001 的 mqtt 设备下发消息,但 vin001 的设备仅建立了mqtt连接,并未主动订阅任何 mqtt Topic。该场景可以借助 MQ + MQTT 组合方案搭配 MQTT 自动订阅,实现类似于 “P2P” (点对点)的通信方式。
架构

准备工作
- 消息队列 Kafka 版中创建实例,并创建一个topic名为 restart
- 消息队列 Kafka 版中配置 Kafka 用户,其对 restart topic有读写权限
- EMQx Dashboard 中完成用户认证和授权相关配置 快速开始-构建第一个mqtt连接
- 确保 消息队列 Kafka 版 和 云原生消息引擎 MQTT 版 在同一个VPC中
为 EMQx 配置Kafka消费者订阅
- 通过 EMQ-Kafka-Connector 绑定 EMQx 和 Kafka Topic 订阅关系
- 连接器 -> Kafka消费者 创建一个 Kafka 消费者连接器
- 填写 Kafka VPC接入点
- 配置 Kafka 认证方式、用户名、密码

配置转发规则
- 侧栏 集成 -> 规则 中创建新规则
- 编辑 数据输入,将其配置为 Kafka消费者

编辑输入
- 配置所指定的 Kafka消费者 连接器
- 声明该规则所订阅的 Kafka 主题 为 restart
- 选择 Key 和 Value 编码模式
- 示例中为 Base64 和 None,可根据实际情况修改
- 如修改编码模式,后续 规则SQL 语句相应字段的处理函数也需要同步修改

配置动作输出
- 配置输出动作为 **消息重发布,**指定订阅 Kafka Topic 数据后,经过 EMQx 处理转发的 MQTT Topic 为动态Topic,其来自 Kafka 消息 Key。
- Java后端服务发送消息到 Kafka 时 clientid 被有意的设置为 消息key,其被 EMQx 规则处理为 /restart/key
- 设备自动订阅 其 /restart/vin 的 MQTT Topic
- 消息 key 与 clientID相同,如均为 vin001。即完成消息与设备的订阅绑定
- 设置MQTT payload 为 {aim} 字段

编辑规则SQL
- 设置规则SQL
- 将 json格式消息value 中的 aim字段 作为新的aim字段(根据上文的规则,其被传递到payload中)
- From 字段在配置 数据输入为 kafka消费者 时,其SQL语句会自动配置,无需手动修改
SELECT
value.aim as aim, -- payload 配置为 {aim}
base64_decode(key) as key -- payload中未包含此字段,故不会在本例子中实际传递
FROM
"$bridges/kafka_consumer:kafka" -- 该字段为系统自动更新
消息验证
- 通过 消息队列 Kafka 控制台的消息发送功能向 kafka restart topic中发送数据
- 其可以被正确的转发到连接的 vin001 设备
