You need to enable JavaScript to run this app.
导航
通过自动订阅实现 p2p 指令发布
最近更新时间:2025.03.06 10:16:59首次发布时间:2025.03.06 10:16:59

概述

MQTT 设备在出厂时通常仅指定 MQTT 服务器的接入地址,但没有指定其所订阅的 MQTT Topic。云上的控制服务(通常是 Java/Go/C++)希望通过根据指定的设备id来实现指令的下发。如 某个Java实现的后端服务希望向 clientid 为 vin001 的 mqtt 设备下发消息,但 vin001 的设备仅建立了mqtt连接,并未主动订阅任何 mqtt Topic。该场景可以借助 MQ + MQTT 组合方案搭配 MQTT 自动订阅,实现类似于 “P2P” (点对点)的通信方式。

架构

准备工作

  • 消息队列 Kafka 版中创建实例,并创建一个topic名为 restart
  • 消息队列 Kafka 版中配置 Kafka 用户,其对 restart topic有读写权限
    • 测试用户:名称 root,密码 ******
  • EMQx Dashboard 中完成用户认证和授权相关配置 快速开始-构建第一个mqtt连接
    • 测试用户:名称 user,密码 123456
  • 确保 消息队列 Kafka 版 和 云原生消息引擎 MQTT 版 在同一个VPC中

为 EMQx 配置Kafka消费者订阅

  • 通过 EMQ-Kafka-Connector 绑定 EMQx 和 Kafka Topic 订阅关系
  • 连接器 -> Kafka消费者 创建一个 Kafka 消费者连接器
    • 填写 Kafka VPC接入点
    • 配置 Kafka 认证方式、用户名、密码

Image

配置转发规则

  • 侧栏 集成 -> 规则 中创建新规则
  • 编辑 数据输入,将其配置为 Kafka消费者

Image

编辑输入

  • 配置所指定的 Kafka消费者 连接器
  • 声明该规则所订阅的 Kafka 主题 为 restart
  • 选择 Key 和 Value 编码模式
    • 示例中为 Base64 和 None,可根据实际情况修改
    • 如修改编码模式,后续 规则SQL 语句相应字段的处理函数也需要同步修改

Image

配置动作输出

  • 配置输出动作为 **消息重发布,**指定订阅 Kafka Topic 数据后,经过 EMQx 处理转发的 MQTT Topic 为动态Topic,其来自 Kafka 消息 Key。
    • Java后端服务发送消息到 Kafka 时 clientid 被有意的设置为 消息key,其被 EMQx 规则处理为 /restart/key
    • 设备自动订阅 其 /restart/vin 的 MQTT Topic
    • 消息 key 与 clientID相同,如均为 vin001。即完成消息与设备的订阅绑定
  • 设置MQTT payload 为 {aim} 字段

Image

编辑规则SQL

  • 设置规则SQL
    • 将 json格式消息value 中的 aim字段 作为新的aim字段(根据上文的规则,其被传递到payload中)
    • From 字段在配置 数据输入为 kafka消费者 时,其SQL语句会自动配置,无需手动修改
SELECT
 value.aim as aim, -- payload 配置为 {aim}
 base64_decode(key) as key -- payload中未包含此字段,故不会在本例子中实际传递
FROM
  "$bridges/kafka_consumer:kafka" -- 该字段为系统自动更新

消息验证

  • 通过 消息队列 Kafka 控制台的消息发送功能向 kafka restart topic中发送数据
  • 其可以被正确的转发到连接的 vin001 设备

Image