本文介绍了如何借助北向通道将边缘节点采集的设备属性数据上报到第三方 Kafka 消息队列。
准备一个边缘节点,并完成模拟数据源。
注意
边缘智能软件版本必须在 v2.7.0 及以上。
准备一个 Kafka 服务器。
您可以使用在云服务商平台开通的 Kafka 消息队列实例,也可以使用 Docker 自行部署一个 Kafka 实例。
在本文中,我们使用一个在火山引擎消息队列 Kafka 版中创建的实例作为示例。在示例实例中,创建了如下的操作用户和 Topic:
参照创建北向通道,创建一个北向通道。
按以下方式配置数据源(source)和数据目的地(sink):
类型 | 配置项 | 说明 |
---|---|---|
数据源配置 | 节点 | 选择您的边缘节点。 |
设备模板 | 选择 全部。 | |
驱动模板 | 选择 全部。 | |
设备实例 | 选择您在模拟数据源操作中,接入到边缘节点的虚拟设备。 | |
sink 节点配置 | sink 类型 | 选择 kafka。 |
参数配置 |
|
完成以上配置后,单击 连通性测试。确保连通性测试成功,然后单击 保存。
准备工作完成后,北向通道将被部署到边缘节点。当北向通道正常运行时,虚拟设备的随机属性数据将被上报到 Kafka 消息队列。您可以按以下方式进行验证:
默认情况下,边缘节点采集到的设备属性数据会以原始格式转发到目标 Kafka broker。如果目标 Kafka broker 需要特定的数据格式,您可以相应地修改数据转发格式。
要修改数据转发格式,按以下方式配置:
dataTemplate
参数,并定义该参数的值为数据格式转换逻辑。说明
如需编写数据格式转换逻辑,可以参考设备数据采集 - 原始数据格式以及使用 Golang template。您也可以提交工单联系我们,以获取帮助。
修改数据格式后,您可前往 Kafka 进行验证。在最新接收到的数据中,如果数据格式符合预期,表示数据转发格式已经修改成功。
下表介绍了 kafka sink 支持的所有参数。
参数 | 数据类型 | 是否必选 | 说明 | 示例值 |
---|---|---|---|---|
brokers | String | 是 | Kafka Broker 的 URL 列表。多个 URL 间以半角逗号(,)分隔。 |
|
topic | String | 是 | 发送消息到的目标 Topic。 |
|
saslAuthType | String | 是 | Kafka 的 SASL 认证类型。取值: |
|
saslUserName | String | 否 | SASL 认证的用户名。 |
|
saslPassword | String | 否 | SASL 认证的密码。 |
|
batchSize | Integer | 否 | 一个消息批次包含消息的数量。默认值为 |
|
maxAttempts | Integer | 否 | 消息发送失败后进行重试的次数。默认值为 |
|
headers | Object | 否 | 向 Kafka 上报的消息中添加的自定义标头信息。 |
|
key | String | 否 | 向 Kafka 上报的消息中添加的自定义 Key 信息。 |
|
insecureSkipVerify | Boolean | 否 | 是否跳过证书验证。默认值为 |
|
enableCache | Boolean | 否 | 是否启用本地缓存。默认值为 |
|
cleanCacheAtStop | Boolean | 否 | 北向通道停止处理时是否清除缓存。默认值为 |
|
memoryCacheThreshold | Integer | 否 | 本地内存中允许缓存的最大消息数量。默认值为 |
|
maxDiskCache | Integer | 否 | 本地磁盘中允许缓存的最大消息数量。默认值为 |
|
bufferPageSize | Integer | 否 | 缓冲区每页中包含的消息数量,防止频繁 IO。默认值为 |
|
ResendInterval | Integer | 否 | 重新发送缓存消息的时间间隔(毫秒)。默认值为 |
|