MQTT场景中,设备状态上报是一个非常场景的场景。当设备信息上报到 MQTT 服务器后,MQTT 服务器需要将全部的 MQTT 消息保存,以便后续的分析。本文将使用 Kafka 实现设备信息的汇报。
场景
- 设备将上报设备影子信息到其设备id的 mqtt topic
- mqtt 服务器配置规则,将符合条件的多个 mqtt topic 转发到一个Kafka topic
涉及产品
- MQTTx 客户端
- 云原生消息引擎 MQTT 版, mqtt.2k 规格
- 消息队列 Kafka版 2.8.2
场景实践
- Kafka 中创建配置的目标 topic
- 例子中使用 “agg” 作为topic名称
- 实例详情中获取,SASL_PLAINTEXT接入点 的 vpc接入点信息,如 kafka-xxxxxxxxxxxxxxx.kafka.ivolces.com:9093

- 创建 Kafka 用户,并配置 用户名、密码、权限

- 云原生消息引擎 MQTT 版,配置 Kafka 生产者监听器
- 配置 plaintext 的接入点地址
- 配置 用户名、密码
- 认证方法选择 plain

- 云原生消息引擎 MQTT 版,配置规则语句
- mqtt服务器规则 匹配
device/#
的全部 topic - 转发到 kafka 的
agg
topic

- 云原生消息引擎 MQTT 版,配置规则转发的连接器
- 选择已注册的 kafka 连接器
- 输入目标端的 kafka topic 名称

验证
- 通过 MQTTx 客户端,模拟2个device,发送数据到 mqtt 实例的公网接入点
- id00125 发送消息 device/id00125
- id00128 发送消息 device/id00128


- 查看 emqx 中消息已经命中
- Kafka实例查看 agg topic中已有转发数据