You need to enable JavaScript to run this app.
导航
设备状态汇报到 Kafka
最近更新时间:2025.03.05 21:44:45首次发布时间:2025.03.05 21:44:45

MQTT场景中,设备状态上报是一个非常场景的场景。当设备信息上报到 MQTT 服务器后,MQTT 服务器需要将全部的 MQTT 消息保存,以便后续的分析。本文将使用 Kafka 实现设备信息的汇报。

场景

  1. 设备将上报设备影子信息到其设备id的 mqtt topic
  2. mqtt 服务器配置规则,将符合条件的多个 mqtt topic 转发到一个Kafka topic

涉及产品

  1. MQTTx 客户端
  2. 云原生消息引擎 MQTT 版, mqtt.2k 规格
  3. 消息队列 Kafka版 2.8.2

场景实践

  1. Kafka 中创建配置的目标 topic
    1. 例子中使用 “agg” 作为topic名称
    2. 实例详情中获取,SASL_PLAINTEXT接入点 的 vpc接入点信息,如 kafka-xxxxxxxxxxxxxxx.kafka.ivolces.com:9093

Image

  1. 创建 Kafka 用户,并配置 用户名、密码、权限

Image

  1. 云原生消息引擎 MQTT 版,配置 Kafka 生产者监听器
    1. 配置 plaintext 的接入点地址
    2. 配置 用户名、密码
    3. 认证方法选择 plain

Image

  1. 云原生消息引擎 MQTT 版,配置规则语句
  • mqtt服务器规则 匹配 device/# 的全部 topic
  • 转发到 kafka 的 agg topic

Image

  1. 云原生消息引擎 MQTT 版,配置规则转发的连接器
    1. 选择已注册的 kafka 连接器
    2. 输入目标端的 kafka topic 名称

Image

验证

  1. 通过 MQTTx 客户端,模拟2个device,发送数据到 mqtt 实例的公网接入点
    1. id00125 发送消息 device/id00125
    2. id00128 发送消息 device/id00128

Image
Image

  1. 查看 emqx 中消息已经命中
  1. Kafka实例查看 agg topic中已有转发数据