MQTT 实例可以通过强大的内置规则引擎和数据集成功能,对消息和事件进行实时数据处理、转换和路由,然后根据配置的动作将处理结果进行消息重发布或转发到外部系统。本文介绍将 MQTT 消息转发到 BMQ 实例的详细配置流程。
登录云原生消息引擎控制台。
在顶部菜单栏,选择项目和地域。
在左侧导航栏选择 MQTT 实例,然后单击创建实例。
在创建实例页面,设置实例的基本信息、实例配置、网络配置等关键参数,然后单击下一步:确认订单。
参数 | 说明 |
---|---|
基本信息 | |
计费类型 | 选择实例计费的类型。目前支持按量计费和包年包月两种计费类型,如需了解计费详情,请参见计费说明。 |
实例名称 | 输入实例名称。
|
地域及可用区 | 地域已选定,不可更改。根据业务详情选择可用区。 |
所属项目 | 从下拉列表中选择实例所属项目。 |
实例配置 | |
计算规格 | 根据业务场景预估最大连接数和 TPS,然后选择适合的实例规格。 |
网络配置 | |
私有网络 | 从下拉列表中选择 VPC。如果还未创建私有网络,请参见创建私有网络。 |
子网 | 从下拉列表中选择子网。 |
查看实例配置信息,然后阅读并勾选产品相关协议,再单击提交。
查看实例创建进度。
提交购买订单后,您可以返回 MQTT 实例列表页面。购买的实例显示为初始化中,初始化完成后显示为运行中。
在 MQTT 实例中,需要创建和 BMQ 实例的连接,完成两者的网络打通。
监听器是 MQTT 与客户端之间连接的核心组件,负责接受客户端连接请求。为方便验证,本文选择创建公网路由、TCP协议的监听器。
在 MQTT 实例中,您需要创建用户用于登录 Dashboard。系统为 Dashboard 用户预设了管理员和查看者两种角色,本文需要创建一个管理员角色的用户。
连接器是 MQTT 用于连接外部系统或服务的组件,目前仅支持创建 Kafka 生产者连接器,该连接器实际是连接到 BMQ 实例。
说明
填写的主机列表即是 MQTT 连接的 BMQ 实例的连接地址。
规则引擎是 MQTT 内置的数据处理功能,您可以通过定义 SQL 语句来处理并筛选消息,无需编写代码即可实现数据提取、过滤和转换处理,并将处理后的数据转发到外部系统。目前仅支持转发到 Kafka 生产者连接器,本质上是转发到 BMQ 实例。
说明
除了通过创建规则以外,您也可以选择使用 Flow 设计器,在可视化页面通过拖拽和连接操作即可快速配置数据处理流程。更多信息,请参见创建 Flow。
登录 Dashboard,在导航栏选择集成 > 规则。
在规则页签下,单击页面右上角的创建。
设置规则名称和备注信息。
在 SQL 编辑器中输入 SQL 语句。
下图中的 SQL 语句为SELECT * FROM "mqtt-topic-1"
,表示选择名为mqtt-topic-1
主题中所有的信息。
在创建规则右侧的动作输出页签下,单击添加动作。
在下拉列表中,选择动作类型为 Kafka 生产者。
配置将消息处理结果转发到 Kafka 生产者的参数。
此处仅介绍部分重要参数,如需了解更多,请参见创建规则。
参数 | 说明 |
---|---|
动作类型 | 此处选择Kafka 生产者。 |
动作 | 仅支持创建动作。 |
名称 | 自定义设置 Sink 组件的名称。 |
连接器 | 从下拉列表中选择已提前创建好的 Kafka 生产者连接器。 |
描述 | 自定义设置描述语句。 |
Kafka 主题名称 | 在 Kafka 生产者连接器对应的 BMQ 实例中,已经创建好的 Topic 名称。 |
SQL 语句和动作配置完成后,单击页面左下角的创建,完成规则的创建。
完成上述所有配置流程后,您可以发送消息到 MQTT 主题。如果主题与 SQL 语句中配置的主题匹配,将会触发规则,MQTT 会将消息处理结果转发到 BMQ 实例。
您可以使用第三方的开源 Java SDK 发送消息到 MQTT 主题,开源 Java SDK 下载地址为:Eclipse Paho Java Client。
配置 Broker 地址、MQTT 主题、消息内容等信息,然后执行代码。
代码片段如下所示:
public class MqttPublishSample { public static void main(String[] args) { String topic = "mqtt-topic-1"; String content = "Hello World"; int qos = 2; String broker = "tcp://mqtt-00k0wt***.mqtt.volces.com:1883"; String clientId = "client-test-1"; MemoryPersistence persistence = new MemoryPersistence();
在 BMQ 实例中查询消息。