You need to enable JavaScript to run this app.
导航
最佳实践:Kafka 去重导入 ByteHouse
最近更新时间:2024.09.24 16:49:18首次发布时间:2024.09.24 16:49:18

本文通过一个“实时监控工厂车间温度和湿度数据收集”的使用场景,来介绍 Kafka 去重导入 ByteHouse的使用流程。
工厂会在关键车间设置热传感器,收集温度、湿度等各种传感器数据,并发送给 ByteHouse 进行分析。通过聚合查询和明细查询,可以解决温度和湿度的平均值、极值、变化趋势、警报事件统计等问题,以及特定事件的详细信息、数据校验和准确性、响应维护需求等。

数据示例

表名

数据来源

样例数据

数据意义

热量传感器数据表
(事实表)
每秒生成 1000 条数据
涉及数据从 2021-2023年3年
希望每个ID每小时保留最新一行数据;

Kafka

{
    "sensorId": "sensor1031984076491",
    "timestamp": "2023-12-01T00:00:20",
    "temperature": 31.68,
    "humidity": 55.09,
    "heading": "SW",
    "alertFlags": [
        "humidityDrop"
    ],
    "gpsLatitude": -20.510689,
    "gpsLongitude": 42.272274
}
  • sensorId (String): 传感器ID。
  • timestamp (DateTime): 数据记录的时间戳。
  • temperature (Float64): 当前温度读数,单位摄氏度。
  • humidity (Float64): 当前湿度读数,单位为百分比。
  • heading:传感器朝向,为枚举值。
  • alertFlags (Array(String)): 警报标志,例如["overheat", "humidityDrop"]。
  • gpsLatitudegpsLongitude(Float64):经度,纬度。

操作流程

第一步 准备 Kafka 环境

  1. 准备一个 Kafka 集群,需要确保两者网络互通(如在同一 VPC 内)。
  2. 新建一个 Topic:
    • 命名 sensor,专门用于heat_sensor_data 的数据的订阅。
    • 由于数据会以 ParitionNum%ByteHouseShardCount的对应关系消费到 ByteHouse,分区数建议为 ByteHouse 分片数的整数倍,使得数据最终落入分片时可以均衡。比如此示例有 4 个 Shard,则建议分区数选择 4 个,8 个等。此处以 8 个示例。
  3. 在 Kafka 中数据写入。

第二步 新建库表

  • 我们需要 1 张 ByteHouse 本地表(又称为 MergeTree 家族表),用于存储 Kafka 接收到的事实表数据。命名为 heat_sensor_data_local
  • 另外需要一张分布式表,用于接收查询请求,命名为 heat_sensor_data

新建库

我们可以通过ByteHouse控制台完成新建数据库。

或在客户端里通过 SQL 创建:

CREATE DATABASE sensor ON CLUSTER bytehouse_test 

新建本地表与分布式表

  1. 选择表引擎,可以参考下面的说明。考虑需求是**希望每个ID每小时保留最新一行数据,**此处采用 HaUniqueMergeTree 。

说明

在选择 MergeTree 家族表引擎时,请判断是否需要去重?或者由于列的数据产生不一致,需要部分列更新?

  • 是:使用 HaUniqueMergeTree。
  • 否:使用 HaMergeTree。
  1. 做好数据映射。以下示基于导入的 JSON 数据示例,我们对要注意的地方进行了标注。
{    
    "sensorId": "sensor1031984076491", //String
    "timestamp": "2023-12-01T00:00:20", //DateTime
    "temperature": 31.68,//Float64
    "humidity": 55.09,//Float64
    "heading": "SW", //字段是常量,建议使用 LowCadinality,LowCadinality 自动进行了编码,查询更快
    "alertFlags": ["humidityDrop"],//因此这个场景下,其中只有 ["overheat", "humidityDrop"] 两种可能,可以展开到2个bool字段,之后查询 SQL 可以更简单
    "gpsLatitude": -20.510689,
    "gpsLongitude": 42.272274
}
  1. 考虑数据分布

基于性能考虑,HaUniqueMergeTree 引擎只能在分片内去重。这就意味着,不论以什么方式写入,相同唯一键的数据,必须插入相同的分片。
数据通过 Kafka 导入方式插入 ByteHouse,最终落入 ByteHouse 是根据 Partition_num%shard_count 选择分片,例如此场景有 2 个分片,上游有 4 个分区,则 3 号分区会插入 1 号分片。
暂时无法在飞书文档外展示此内容
基于上述2点,上游需要保证 Kafka 按照唯一键(sensorId)为依据进行分区,可以做到所有 sensorId 一致的行都落到同一分片,以成功去重;(若使用非去重的引擎 HaMergeTree,则无此问题)

  1. 确定其他表关键字段选择
  • 排序键:选取最常查询的字段,这个例子中显然是 sensorId。
  • 唯一键:在此示例中,每个 ID 每小时希望保留一条,但没有表示小时的字段,因此我们可以采用物化列方式(MATERIALIZED COLUMN)新建一列 hour,让每次导入数据时从 datetime 生成 hour 字段,将唯一键设置为 (sensorId, hour)
  • 分片键:在此场景下,Kafka 的 Partition 和分片有自动映射关系,不设置分片键。
  • 分区键:推荐使用时间字段,如 timestamp,但单位为天(否则每秒都会产生一个分区)。
  • TTL:TTL 建议依据分区键设置,此处根据需求,配置为 3 年。
  1. Nullable 选择问题:
  • sensorId 和 timestamp 字段是排序键和分区键,需要确保他们不可为空,不然会导致性能下降;
  • 反之,其他字段都建议为空(除非能100%确保不为空),防止为空造成导入错误;
  1. 其他注意事项
  • 字段名建议和 json 中保持完全一致,便于后续导入任务时自动匹配。
  • 表的重要 Settings:
    • index_granularity,索引的颗粒度,值越小单行查的越快,值越大批量查得越快;建议采用默认的 8192;
    • partition_level_unique_keys:去重级别
      • 1:(默认值)分区粒度去重:数据在分区内唯一,此场景不建议调整(但注意业务上要知晓这个限制)。
      • 0:全表粒度去重:数据在全表范围内唯一,会引起更多内存,以及使得冷存储等很多场景会不支持。因此除非业务强要求,否则不建议。
    • enable_disk_based_unique_key_index:unique key 索引位置:
      • 1:unique_key_index 在磁盘上,会导致导入性能会下降 30%;
      • 0:(默认值)unique_key_index 在内存上,但会有一定内存开销,要注意内存的监控(见“场景运维指南”)。
  1. 基于上述要求,生成 SQL 如下:
CREATE TABLE sensor.heat_sensor_data_unique_local ON CLUSTER bytehouse_test (
    sensorId String,
    timestamp DateTime,
    hour DateTime MATERIALIZED toStartOfHour(timestamp),
    temperature Nullable(Float64),
    humidity Nullable(Float64),
    heading LowCadinality(Nullable(String)),
    overheat Nullable(Bool), 
    humidityDrop Nullable(Bool),
    gpsLatitude Nullable(Float64),
    gpsLongitude Nullable(Float64)
) ENGINE = HaUniqueMergeTree()
UNIQUE KEY (sensorId, hour)
ORDER BY sensorId
PARTITION BY toDate(timestamp)
    TTL timestamp + INTERVAL 3 YEAR
SETTINGS index_granularity = 8192;

--建分布式表,
CREATE TABLE sensor.heat_sensor_data_unique ON CLUSTER bytehouse_test AS sensor.heat_sensor_data_unique_local ENGINE = Distributed(bytehouse_test, sensor, heat_sensor_data_unique_local);

第三步 配置 Kafka 导入任务

新建数据源

  1. 进入 数据管理和查询>数据导入>数据源>(中间栏右上角)新建数据源
  2. 选择源类型为 Kafka。
  3. 填写源名称。
  4. 填写 Kafka 代理列表,注意,对应不同安全协议要填写不同的接入点。

说明

建议对内网安全敏感采用 sasl_ssl ;对安全要求不高,则采用 sasl_plaintext 效率更高。

  1. 填写用户名,密码。
  2. 点击确定,会进行一次对 Kafka 的的连接校验,如果校验成功,则会成功新建数据源。

新建导入任务

  1. 进入控制台 数据管理和查询>数据导入>数据源>数据源详情,单击新建导入任务按钮。
  1. 输入任务名,选择导入类型为 Kafka,选择 TOPIC(sensor)。
  1. 高级键设置
  • Group 名称建议手动设置,便于运维任务时辨认,如 bytehouse_sensor_data_consumer;
  • 自动重设 Offset,Kafka 格式和分隔符根据根据个人要求选择。
  • 消费者个数建议填写 2,后续消费性能不够可以继续改大(最大不超过核数/2)。
  • BlockSize 建议按默认的 65536,如果对实时性要求高,可以减半;
  1. 选择目标库表:
  1. 编辑原列和目标列的映射关系
  • 对于 Kafka 中有数据的场景,选择“数据映射”,则会采样 Kafka Topic 中的一条数据进行解析。
  • 对于没有数据的场景,可以选择 JSON,填写一条 JSON 示例进行匹配。

注意覆盖方式选择“存量覆盖”。
如果目标表名和JSON中的字段名完全一样,则会自动匹配上,如果不同,则需要进行编辑。
hour 列是物化列,不需要导入,可以点击行右侧的“删除”。还可能有一些不需要导入数据库的列,也可以按此方式删除。

同时,以下一些常见的情况要特殊处理,需要通过导入公式,进行源列到目标列的数据转化。

导入公式示例

  • Array 数据提取:

    此示例中,"alertFlags" 字段里只会出现 overheat 和 humidityDrop 两个值,因此希望将数据提取到 overheat 和 humidityDrop 两个 Bool 列,如果数组中有 "overheat",则列 overheat = True。
    因此按如下方法设置 overheat 列(humidityDrop列同理):

    • 源列:overheat(按目标列填写)
    • 源数据类型:Bool(按目标列类型选择)
    • 源表达式:has(alertFlags,"overheat")
  • JSON 多层嵌套处理:

    例如,如果 "gpsLatitude" 嵌套在一个 mapFloat 中,要把它提取到 gpsLatitude 列:

"mapFloat": {
    "gpsLatitude": -20.510689,
    "gpsLongitude": 42.272274
}

需要按如下规则填写:

  • 源列:gpsLatitude(按目标列填写)
  • 源数据类型:Float64(按目标列类型选择)
  • 源表达式:JSONExtract(_content, 'data', 'temperature' ,'Float64')(参考:JSON 函数--ByteHouse 企业版-火山引擎
  • Timestamp 处理:
    1. 时区:如果导入数据的时区和集群时区不一致,通过表达式 toTimezone(timestamp, <timezone>)处理;
      • 源列:timestamp(按目标列填写)
      • 源数据类型:DateTime
      • 源表达式:toTimezone(timestamp, 'Asia/Shanghai')
    2. 无法解析:有时时间格式会出现一些无法解析的格式,如 "2024-01-16T12:00:00Z"(时间戳中含有0时区标记)等。具体是否可以转化,可以通过在 SQL 编辑器中用 toDateTime(?) 来测试。若无法转化,可以使用parseDateTimeBestEffortOrNull(timestamp)表达式。
      • 源列:timestamp(按目标列填写)
      • 源数据类型:String
      • 源表达式:parseDateTimeBestEffortOrNull(timestamp)
  • 特殊值处理:
    • 源列是 Null,但当前字段是非空的,需要使用 assumeNotNull(x)
    • 原列是 "N/A",需要转成空,需要使用nullif(x, 'N/A')
    • 其他请参考产品手册 SQL Reference 中的各类函数部分,以及依照“日志”中的报错,做相应调整。
  1. 提交任务。

提交任务,等待一段时间。也可以点击 导入任务名称 > 执行 ID > 日志,查看导入留下的日志,如果有数据写入成功或失败了,则会留下日志。例如:

测试数据

select sensorId, timestamp from sensor.heat_sensor_data_unique order by desc limit 10;

执行上述命令,如果已经可以查到数据,则表明导入成功了。