You need to enable JavaScript to run this app.
ByteHouse 企业版

ByteHouse 企业版

复制全文
最佳实践
最佳实践:Kafka 去重导入 ByteHouse
复制全文
最佳实践:Kafka 去重导入 ByteHouse

本文通过一个“实时监控工厂车间温度和湿度数据收集”的使用场景,来介绍 Kafka 去重导入 ByteHouse 的使用流程。
工厂会在关键车间设置热传感器,收集温度、湿度等各种传感器数据,并发送给 ByteHouse 进行分析。通过聚合查询和明细查询,可以解决温度和湿度的平均值、极值、变化趋势、警报事件统计等问题,以及特定事件的详细信息、数据校验和准确性、响应维护需求等。

数据示例

表名

数据来源

样例数据

数据意义

热量传感器数据表
(事实表)
每秒生成 1000 条数据
涉及数据从 2021-2023年3年
希望每个ID每小时保留最新一行数据;

Kafka

{
    "sensorId": "sensor1031984076491",
    "timestamp": "2023-12-01T00:00:20",
    "temperature": 31.68,
    "humidity": 55.09,
    "heading": "SW",
    "alertFlags": [
        "humidityDrop"
    ],
    "gpsLatitude": -20.510689,
    "gpsLongitude": 42.272274
}
  • sensorId (String): 传感器ID。
  • timestamp (DateTime): 数据记录的时间戳。
  • temperature (Float64): 当前温度读数,单位摄氏度。
  • humidity (Float64): 当前湿度读数,单位为百分比。
  • heading:传感器朝向,为枚举值。
  • alertFlags (Array(String)): 警报标志,例如["overheat", "humidityDrop"]。
  • gpsLatitudegpsLongitude(Float64):经度,纬度。

操作流程

第一步:准备 Kafka 环境

  1. 准备一个 Kafka 集群,需要确保 Kafka 集群与 ByteHouse 网络互通(如在同一 VPC 内)。
  2. 新建一个 Topic:
    • 命名 sensor,专门用于heat_sensor_data 的数据的订阅。
    • 由于数据会以 ParitionNum%ByteHouseShardCount的对应关系消费到 ByteHouse,分区数建议为 ByteHouse 分片数的整数倍,使得数据最终落入分片时可以均衡。比如此示例有 4 个 Shard,则建议分区数选择 4 个,8 个等。此处以 8 个示例。
  3. 在 Kafka 中写入数据。写入数据时,请参考 HaKafka 表的写入规则,详情请参见HaKafka

第二步:新建库表

  • 我们需要 1 张 ByteHouse 本地表(又称为 MergeTree 家族表),用于存储 Kafka 接收到的事实表数据。命名为 heat_sensor_data_local
  • 另外需要一张分布式表,用于接收查询请求,命名为 heat_sensor_data

新建库

我们可以通过 ByteHouse 控制台完成新建名为 sensor 的数据库。
Image
或在客户端里通过 SQL 创建:

CREATE DATABASE sensor ON CLUSTER bytehouse_test 

新建本地表与分布式表

  1. 选择表引擎,可以参考下面的说明。考虑需求是希望每个 ID 每小时保留最新一行数据,​此处采用 HaUniqueMergeTree 。

    说明

    在选择 MergeTree 家族表引擎时,请判断是否需要去重,或者由于列的数据产生不一致,需要部分列更新。

    • 是:使用 HaUniqueMergeTree。
    • 否:使用 HaMergeTree。
  2. 做好数据映射。以下是基于导入的 JSON 数据示例,我们对要注意的地方进行了标注。

    {    
        "sensorId": "sensor1031984076491", //String
        "timestamp": "2023-12-01T00:00:20", //DateTime
        "temperature": 31.68,//Float64
        "humidity": 55.09,//Float64
        "heading": "SW", //字段是常量,建议使用 LowCadinality,LowCadinality 自动进行了编码,查询更快
        "alertFlags": ["humidityDrop"],//因此这个场景下,其中只有 ["overheat", "humidityDrop"] 两种可能,可以展开到2个bool字段,之后查询 SQL 可以更简单
        "gpsLatitude": -20.510689,
        "gpsLongitude": 42.272274
    }
    
  3. 考虑数据分布。
    基于性能考虑,HaUniqueMergeTree 引擎只能在分片内去重。这就意味着,不论以什么方式写入,相同唯一键的数据,必须插入相同的分片。
    数据通过 Kafka 导入方式插入 ByteHouse,最终落入 ByteHouse 是根据 Partition_num%shard_count 选择分片,例如此场景有 2 个分片,上游有 4 个分区,则 3 号分区会插入 1 号分片。
    Image
    基于上述两点,上游需要保证 Kafka 按照唯一键(sensorId)为依据进行分区,可以做到所有 sensorId 一致的行都落到同一分片,以成功去重。若使用非去重的引擎 HaMergeTree,则无此问题。

  4. 确定其他表关键字段选择。

    • 排序键:选取最常查询的字段,这个例子中显然是 sensorId。
    • 唯一键:在此示例中,每个 ID 每小时希望保留一条,但没有表示小时的字段,因此我们可以采用物化列方式(MATERIALIZED COLUMN)新建一列 hour,让每次导入数据时从 datetime 生成 hour 字段,将唯一键设置为 (sensorId, hour)
    • 分片键:在此场景下,Kafka 的 Partition 和分片有自动映射关系,不设置分片键。
    • 分区键:推荐使用时间字段,如 timestamp,但单位为天,否则每秒都会产生一个分区。
    • TTL:TTL 建议依据分区键设置,此处根据需求,配置为 3 年。
  5. 确认字段是否为 Nullable。本案例中需考虑的因素如下:

    • sensorId 和 timestamp 字段是排序键和分区键,需要确保他们不可为空,不然会导致性能下降。
    • 反之,其他字段都建议为空(除非能 100% 确保不为空),防止为空造成导入错误。
  6. 确认其他注意事项:

    • 字段名建议和 JSON 中保持完全一致,便于后续导入任务时自动匹配。
    • 表的重要 Settings:
      • index_granularity,索引的颗粒度,值越小单行查的越快,值越大批量查得越快;建议采用默认的 8192。
      • partition_level_unique_keys:去重级别
        • 1:默认值,表示分区粒度去重,即数据在分区内唯一,此场景不建议调整。但注意业务上要知晓这个限制。
        • 0:全表粒度去重,即数据在全表范围内唯一,会引起更多内存,以及使得冷存储等很多场景会不支持。因此除非业务强要求,否则不建议。
      • enable_disk_based_unique_key_index:unique key 索引位置:
        • 1:unique_key_index 在磁盘上,会导致导入性能会下降 30%;
        • 0:默认值,unique_key_index 在内存上,但会有一定内存开销,要注意内存的监控,您可参考监控告警,查看内存监控信息。
  7. 基于上述要求,生成 SQL 如下:

    -- 创建本地表
    CREATE TABLE sensor.heat_sensor_data_unique_local ON CLUSTER bytehouse_test (
        sensorId String,
        timestamp DateTime,
        hour DateTime MATERIALIZED toStartOfHour(timestamp),
        temperature Nullable(Float64),
        humidity Nullable(Float64),
        heading LowCadinality(Nullable(String)),
        overheat Nullable(Bool), 
        humidityDrop Nullable(Bool),
        gpsLatitude Nullable(Float64),
        gpsLongitude Nullable(Float64)
    ) ENGINE = HaUniqueMergeTree('/clickhouse/tables/{shard}/sensor/heat_sensor_data_unique_local', '{replica}')
    UNIQUE KEY (sensorId, hour)
    ORDER BY sensorId
    PARTITION BY toDate(timestamp)
        TTL timestamp + INTERVAL 3 YEAR
    SETTINGS index_granularity = 8192;
    
    -- 创建分布式表
    CREATE TABLE sensor.heat_sensor_data_unique ON CLUSTER bytehouse_test AS sensor.heat_sensor_data_unique_local ENGINE = Distributed(bytehouse_test, sensor, heat_sensor_data_unique_local);
    

第三步:配置 Kafka 导入任务

新建数据源

  1. 登录 ByteHouse 控制台,单击数据导入,在右上角选择对应集群
    Image

  2. 单击数据源模块中的“+”,新建数据源,数据源类型选择为 Kafka。
    Image

  3. 在右侧数据源配置界面,根据界面提示,依次输入以下信息:

    参数项

    配置说明

    源类型

    选择 Kafka 数据源类型。

    源名称

    任务名称,和其他任务不能重名。

    Kafka 代理列表

    填写对应的 Kafka Broker 地址。填写时,请务必填写您使用的接入点所有 Broker 节点的 IP 地址。如果需要填写多个 Broker 地址,请用逗号(,)进行分割。如 10.100.19.127:9092,10.100.19.128:9092
    如果您使用的是火山引擎 BMQ,请使用在准备工作中添加的接入点,请勿使用默认接入点,否则网络无法联通,详情请参考准备工作

    身份验证模式

    当前 Kafka 数据源支持四种鉴权模式,包括 NONE 无鉴权、PLAIN、SCRAM-SHA-256、SCRAM-SHA-512,并支持 SSL 加密,您可根据 Kafka 代理列表 IP 地址的端口号勾选,对应关系如下:

    • None 无鉴权:端口号为 9092。
    • PLAIN、SCRAM-SHA-256、SCRAM-SHA-512
      • 如果未开启 SSL:端口号为 9093。
      • 如果开启 SSL:端口号为 9095。

    安全协议

    支持选择 SASL_PLAINTEXT、SASL_SSL 协议类型。

    用户名、密码

    填写有权限访问 Kafka 实例的用户名和密码信息。

  4. 数据源信息填写完成后,单击确定按钮,进行数据源连通性测试,连通成功后,即代表数据源创建成功。

新建导入任务

  1. 在 Kafka 数据源下,单击新建导入任务

  2. 进入新建导入任务配置界面,并完成以下信息配置:

    参数

    说明

    通用信息

    任务名称

    填写导入任务名称信息,支持数字、字母及下划线,不能以数字开头,最多仅支持 128 字符,且不能和现有任务重名。

    描述

    输入该导入任务相关描述信息,方便后续维护管理。

    选择数据源

    源类型

    选择 Kafka 数据源类型。

    数据源

    下拉选择已创建成功的 Kafka 数据源。

    Topic

    下拉选择 Kafka 数据源中的已有的 Topic 信息,本案例中为 sensor。

    Group 名称

    Kafka Consumer Group 名称。建议手动设置,便于运维任务时辨认,如 bytehouse_sensor_data_consumer。
    如果您使用的是火山引擎云原生消息引擎 BMQ,需要手动填写在准备工作环节创建的 Consumer Group 名称,详情请参考准备工作

    自动重设 Offset

    指初次启动任务时,Kafka 最新生产的数据开始消费的 offset,第二次启动任务时,会从上次消费暂停的 offset 恢复。

    格式

    消息格式,目前最常用 JSONEachRow。

    分隔符

    输入消息分隔符,一般使用 '\n'。

    消费者个数

    消费者个数,每个消费者会创建一个线程。建议填写 2,后续消费性能不够可以继续改大(最大不超过核数/2)。

    写入 Block Size

    写入的 block_size 大小。建议按默认的 65536,如果对实时性要求高,可以减半。

    选择目标表

    目标数据库

    下拉选择数据导入的目标 ByteHouse 数据库。

    目标数据表

    下拉选择数据导入的目标 ByteHouse 表。
    Image

    目标 Schema 配置

    提取 Schema

    此处配置 Kafka 中的信息和 ByteHouse 表信息的映射。

    • 对于 Kafka 中有数据的场景,选择“数据映射”,则会采样 Kafka Topic 中的一条数据进行解析。
    • 对于没有数据的场景,可以选择 JSON,填写一条 JSON 示例进行匹配。

    使用数据映射方式时,注意覆盖方式选择“覆盖添加”。
    如果目标表名和 JSON 中的字段名完全一样,则会自动匹配上,如果不同,则需要进行编辑。
    hour 列是物化列,不需要导入,可以点击行右侧的“删除”。还可能有一些不需要导入数据库的列,也可以按此方式删除。
    Image
    同时,以下一些常见的情况要特殊处理,需要通过导入公式,进行源列到目标列的数据转化,详见下文导入公式示例说明。

    导入公式示例

    • Array 数据提取:此示例中,"alertFlags" 字段里只会出现 overheat 和 humidityDrop 两个值,因此希望将数据提取到 overheat 和 humidityDrop 两个 Bool 列,如果数组中有 "overheat",则列 overheat = True。
      因此按如下方法设置 overheat 列(humidityDrop列同理):

      • 源列:overheat(按目标列填写)
      • 源数据类型:Bool(按目标列类型选择)
      • 源表达式:has(alertFlags,"overheat")
        Image
    • JSON 多层嵌套处理:
      例如,如果 "gpsLatitude" 嵌套在一个 mapFloat 中,要把它提取到 gpsLatitude 列:

      "mapFloat": {
          "gpsLatitude": -20.510689,
          "gpsLongitude": 42.272274
      }
      

      需要按如下规则填写:

      • 源列:gpsLatitude(按目标列填写)
      • 源数据类型:Float64(按目标列类型选择)
      • 源表达式:JSONExtract(_content, 'mapFload','gpsLatitude', 'Float64')(参考:JSON 函数
    • Timestamp 处理:

      • 时区:如果导入数据的时区和集群时区不一致,通过表达式 toTimezone(timestamp, <timezone>)处理;
        • 源列:timestamp(按目标列填写)
        • 源数据类型:DateTime
        • 源表达式:toTimezone(timestamp, 'Asia/Shanghai')
      • 无法解析:有时时间格式会出现一些无法解析的格式,如 "2024-01-16T12:00:00Z"(时间戳中含有0时区标记)等。具体是否可以转化,可以通过在 SQL 编辑器中用 toDateTime(?) 来测试。若无法转化,可以使用parseDateTimeBestEffortOrNull(timestamp)表达式。
        • 源列:timestamp(按目标列填写)
        • 源数据类型:String
        • 源表达式:parseDateTimeBestEffortOrNull(timestamp)
    • 特殊值处理:

      • 源列是 Null,但当前字段是非空的,需要使用 assumeNotNull(x)
      • 原列是 "N/A",需要转成空,需要使用nullif(x, 'N/A')
      • 其他请参考产品手册 SQL Reference 中的各类函数部分,以及依照“日志”中的报错,做相应调整。
  3. 提交任务,等待一段时间。也可以点击 导入任务名称 > 执行 ID > 日志,查看导入留下的日志,如果有数据写入成功或失败了,则会留下日志。例如:

测试数据

select sensorId, timestamp from sensor.heat_sensor_data_unique order by desc limit 10;

执行上述命令,如果已经可以查到数据,则表明导入成功了。

最近更新时间:2026.02.03 20:10:16
这个页面对您有帮助吗?
有用
有用
无用
无用