You need to enable JavaScript to run this app.
导航
创建规则
最近更新时间:2024.12.17 14:28:36首次发布时间:2024.07.18 16:53:23

在 Dashboard 页面,您可以定义 SQL 语句用来处理并筛选来自消息、事件以及外部数据系统的数据,并为规则添加动作,实现将处理结果通过消息发布、打印到控制台或通过 Sink 进行转发的目的。

背景信息

规则是 MQTT 内置的数据处理功能。

  • 使用 SQL 语句编排规则,并通过消息、客户端事件以及外部数据系统触发,无需编写代码即可实现一站式的 IoT 数据提取、过滤和转换处理,并将处理完成后的数据输出到动作或外部数据系统中。
  • 配合使用 Sink/Source 组件,提供了清晰、灵活的业务集成方案,能够简化业务开发流程,提升易用性。

目前支持消息重发布控制台输出外部数据系统 3 种动作类型,每种动作类型的作用和应用场景不同,请先查看以下描述,再根据业务场景创建合适的规则。

  • 消息重发布:用来发布一条新的 MQTT 消息,适用于需要发送下行消息的场景。
  • 控制台输出:用于查看规则的输出结果,结果将以日志的形式打印到控制台或日志中。控制台输出动作仅用于调试,如果在生产环境中使用,可能会导致性能问题。
  • 外部数据系统:将规则结果转发到外部数据系统,支持转发到 MQTT 服务、HTTP 服务、Kafka 生产者、MySQL 等 30 种外部数据系统。

前提条件

如果您需要创建将处理结果转发到外部数据系统的规则,请提前创建相应类型的连接器,具体操作步骤,请参见 创建连接器

设置 SQL 语句

  1. 在 Dashboard 导航栏选择集成 > 规则
  2. 规则页签下,单击页面右上角的创建
  3. 设置规则名称和备注信息。
  4. 在 SQL 编辑器中输入 SQL 语句。

    说明

    下图中的 SQL 语句为SELECT * FROM "mqtt-topic-1",表示选择名为mqtt-topic-1的主题中所有的信息。
    系统默认是选择t/#模式的所有主题下的消息。您修改 SQL 语句后,右侧的数据输入页签下的信息也会发生变化。

    Image

测试 SQL 语句

您可以使用模拟数据执行 SQL 语句,验证 SQL 执行结果是否符合预期。

  1. 启用调试区域,开启开关。
  2. 选择与 SQL 语句匹配的数据来源,然后设置客户端 ID用户名主题QoSPayload 等参数。
  3. 单击运行测试按钮提交测试,然后查看右侧的输出结果。

    说明

    SQL 处理结果将以 JSON 形式呈现在输出结果部分。SQL 处理结果中的所有字段都可以通过后续操作(内置操作或 Sink)以 ${key} 的形式进行引用。

    Image

添加动作

  1. 在创建规则右侧的动作输出页签下,单击添加动作
  2. 在下拉列表中,选择目标动作类型,然后查看相应动作的规则配置方法。
    Image
  3. SQL 语句和动作配置完成后,单击页面左下角的创建,完成规则的创建。

    说明

    规则创建后,默认启用。
    当消息、事件以及外部数据系统的数据触发规则,将执行 SQL 以及规则中添加的所有动作,并获得每个步骤的执行结果。

添加消息发布动作

消息重发布动作用来发布一条新的 MQTT 消息,适用于需要发送下行消息的场景。
Image

参数

说明

动作类型

此处选择消息重发布

主题

将消息重发布到 MQTT 的目标主题,比如doc/topic-1

说明

消息发布时,主题不能使用+#通配符。

QoS

按需设置 MQTT 消息服务质量(QoS)。

  • QoS 0:最多交付一次,消息可能丢失。
  • QoS 1:至少交付一次,消息可以保证到达,但是可能重复。
  • QoS 2:只交付一次,消息保证到达,并且不会重复。

Retain

设置是否将此消息作为保留消息转发,默认为 False。

Payload

输入 ${payload},表示重新发布的消息将与原始消息具有相同的 payload,不进行任何修改。

MQTT 5.0 消息属性

是否配置 MQTT 5.0 消息属性,启用后,可配置以下参数。

  • 有效载荷指示器:指示消息格式。设置为false时,消息是未确定的字节;设置为true时,意味着消息体中的有效载荷是 UTF-8 编码的字符数据。这有助于更加有效的解析消息内容,而不用特意去对消息体进行格式或类型的判断。
  • 消息过期时间:输入一个时间,以秒为单位。指定消息在经过一段时间后过期,如果未传递给预期的接收方,则被视为无效。
  • 内容类型:指定重新发布消息中的载荷内容的类型或格式。text/plain表示文本文件;audio/aac表示音频文件;application/json表示是一条 JSON 格式的应用消息。
  • 响应主题:输入要将响应消息发布到的特定 MQTT 主题。例如,希望将响应发送到名为 response/my_device 的主题。
  • 对比数据:输入一个唯一标识符或数据,用于将响应消息与原始请求消息相关联。您可以输入唯一的请求标识符、事务 ID 或在您的应用程序上下文中有意义的任何其他信息。

添加控制台输出动作

控制台输出动作用于查看规则的输出结果,结果将以日志的形式打印到控制台或日志中。

说明

控制台输出动作仅用于调试。如果在生产环境中使用,可能会导致性能问题。

Image

添加发送到 Sink 的动作

将规则结果通过 Sink 组件转发到外部数据系统。

  • 发送到 Kafka 生产者

    说明

    将规则结果转发到 Kafka 生产者连接器对应实例的 Topic 中,请先在相应实例中创建 Topic。

    Image

    参数

    说明

    动作类型

    此处选择 Kafka 生产者

    动作

    仅支持创建动作

    名称

    自定义设置 Sink 组件的名称。

    连接器

    从下拉列表中选择已提前创建好的 Kafka 生产者连接器。如何创建,请参见创建连接器

    描述

    自定义设置描述语句。

    Kafka 主题名称

    在 Kafka 生产者连接器对应的实例中,已经创建好的 Topic 名称。

    Kafka Headers

    输入与消息相关的元数据或上下文信息。按需提供用作 Kafka Headers 的占位符,例如${pub_props}

    说明

    • 占位符的值必须是一个对象。
    • 您也可以单击添加,添加更多的 Kafka Headers。

    Kafka Headers 值编码类型

    Kafka Headers 的值编码类型:

    • NONE:仅将二进制值添加到 Kafka Headers。
    • JSON:仅将 JSON 值添加到 Kafka Headers,并在发送之前将其编码为 JSON 字符串。

    消息的键

    消息键。输入一个字符串,可以是纯字符串或包含占位符 (${var}) 的字符串。

    消息的值

    消息值。输入一个字符串,可以是纯字符串或包含占位符 (${var}) 的字符串。

    消息的时间戳

    消息时间戳。

    压缩

    是否使用压缩算法压解压消息中的记录。

    分区选择策略

    选择生产者向分区分发消息的方式。

    分区限制

    限制生产者能够发送消息的最大分区数量。

  • 发送到 MySQL

    说明

    将规则结果转发到 MySQL 连接器对应的数据库中,请先在相应数据库中创建好数据表。

    Image

    参数

    说明

    动作类型

    此处选择 MySQL

    动作

    仅支持创建动作

    名称

    自定义设置 Sink 组件的名称。

    连接器

    从下拉列表中选择已提前创建好的 MySQL 连接器。如何创建,请参见创建连接器

    描述

    自定义设置描述语句。

    SQL 模板

    构造 SQL 语句,从规则处理结果中提取数据,并写入指定数据表中。
    SQL 模板示例如下:

    insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) values (${id}, ${topic}, ${qos}, ${payload}, FROM_UNIXTIME(${timestamp}/1000))
    

    以上 SQL 语句实现了将 (${id},${topic},${qos},${payload}) 数据写入表 t_mqtt_msg 中对应 msgidtopicqospayload 列中。

    说明

    ${id}${topic}${qos}${payload} 为占位符,会被实际的 MQTT 消息数据替换。