本文通过一个“实时监控工厂车间温度和湿度数据收集”的使用场景,来介绍 Kafka 去重导入 ByteHouse的使用流程。
工厂会在关键车间设置热传感器,收集温度、湿度等各种传感器数据,并发送给 ByteHouse 进行分析。通过聚合查询和明细查询,可以解决温度和湿度的平均值、极值、变化趋势、警报事件统计等问题,以及特定事件的详细信息、数据校验和准确性、响应维护需求等。
表名 | 数据来源 | 样例数据 | 数据意义 |
---|---|---|---|
热量传感器数据表 | Kafka |
|
|
heat_sensor_data
的数据的订阅。ParitionNum%ByteHouseShardCount
的对应关系消费到 ByteHouse,分区数建议为 ByteHouse 分片数的整数倍,使得数据最终落入分片时可以均衡。比如此示例有 4 个 Shard,则建议分区数选择 4 个,8 个等。此处以 8 个示例。heat_sensor_data_local
。heat_sensor_data
。我们可以通过ByteHouse控制台完成新建数据库。
或在客户端里通过 SQL 创建:
CREATE DATABASE sensor ON CLUSTER bytehouse_test
说明
在选择 MergeTree 家族表引擎时,请判断是否需要去重?或者由于列的数据产生不一致,需要部分列更新?
{ "sensorId": "sensor1031984076491", //String "timestamp": "2023-12-01T00:00:20", //DateTime "temperature": 31.68,//Float64 "humidity": 55.09,//Float64 "heading": "SW", //字段是常量,建议使用 LowCadinality,LowCadinality 自动进行了编码,查询更快 "alertFlags": ["humidityDrop"],//因此这个场景下,其中只有 ["overheat", "humidityDrop"] 两种可能,可以展开到2个bool字段,之后查询 SQL 可以更简单 "gpsLatitude": -20.510689, "gpsLongitude": 42.272274 }
基于性能考虑,HaUniqueMergeTree 引擎只能在分片内去重。这就意味着,不论以什么方式写入,相同唯一键的数据,必须插入相同的分片。
数据通过 Kafka 导入方式插入 ByteHouse,最终落入 ByteHouse 是根据 Partition_num%shard_count 选择分片,例如此场景有 2 个分片,上游有 4 个分区,则 3 号分区会插入 1 号分片。
基于上述2点,上游需要保证 Kafka 按照唯一键(sensorId)为依据进行分区,可以做到所有 sensorId 一致的行都落到同一分片,以成功去重;(若使用非去重的引擎 HaMergeTree,则无此问题)
(sensorId, hour)
。index_granularity
,索引的颗粒度,值越小单行查的越快,值越大批量查得越快;建议采用默认的 8192;partition_level_unique_keys
:去重级别
enable_disk_based_unique_key_index
:unique key 索引位置:
CREATE TABLE sensor.heat_sensor_data_unique_local ON CLUSTER bytehouse_test ( sensorId String, timestamp DateTime, hour DateTime MATERIALIZED toStartOfHour(timestamp), temperature Nullable(Float64), humidity Nullable(Float64), heading LowCadinality(Nullable(String)), overheat Nullable(Bool), humidityDrop Nullable(Bool), gpsLatitude Nullable(Float64), gpsLongitude Nullable(Float64) ) ENGINE = HaUniqueMergeTree() UNIQUE KEY (sensorId, hour) ORDER BY sensorId PARTITION BY toDate(timestamp) TTL timestamp + INTERVAL 3 YEAR SETTINGS index_granularity = 8192; --建分布式表, CREATE TABLE sensor.heat_sensor_data_unique ON CLUSTER bytehouse_test AS sensor.heat_sensor_data_unique_local ENGINE = Distributed(bytehouse_test, sensor, heat_sensor_data_unique_local);
数据管理和查询>数据导入>数据源>(中间栏右上角)新建数据源
。说明
建议对内网安全敏感采用 sasl_ssl ;对安全要求不高,则采用 sasl_plaintext 效率更高。
数据管理和查询>数据导入>数据源>数据源详情
,单击新建导入任务
按钮。注意覆盖方式选择“存量覆盖”。
如果目标表名和JSON中的字段名完全一样,则会自动匹配上,如果不同,则需要进行编辑。
hour 列是物化列,不需要导入,可以点击行右侧的“删除”。还可能有一些不需要导入数据库的列,也可以按此方式删除。
同时,以下一些常见的情况要特殊处理,需要通过导入公式,进行源列到目标列的数据转化。
Array 数据提取:
此示例中,"alertFlags" 字段里只会出现 overheat 和 humidityDrop 两个值,因此希望将数据提取到 overheat 和 humidityDrop 两个 Bool 列,如果数组中有 "overheat",则列 overheat = True。
因此按如下方法设置 overheat 列(humidityDrop列同理):
has(alertFlags,"overheat")
JSON 多层嵌套处理:
例如,如果 "gpsLatitude" 嵌套在一个 mapFloat 中,要把它提取到 gpsLatitude 列:
"mapFloat": { "gpsLatitude": -20.510689, "gpsLongitude": 42.272274 }
需要按如下规则填写:
JSONExtract(_content, 'data', 'temperature' ,'Float64')
(参考:JSON 函数--ByteHouse 企业版-火山引擎)toTimezone(timestamp, <timezone>)
处理;
toTimezone(timestamp, 'Asia/Shanghai')
parseDateTimeBestEffortOrNull(timestamp)
表达式。
parseDateTimeBestEffortOrNull(timestamp)
assumeNotNull(x)
;nullif(x, 'N/A')
;提交任务,等待一段时间。也可以点击 导入任务名称 > 执行 ID > 日志,查看导入留下的日志,如果有数据写入成功或失败了,则会留下日志。例如:
select sensorId, timestamp from sensor.heat_sensor_data_unique order by desc limit 10;
执行上述命令,如果已经可以查到数据,则表明导入成功了。