唯一键引擎(HaUniqueMergeTree) 是 ByteHouse 自研的一款既保留了 ClickHouse 高效的查询性能、又支持主键更新的表引擎。它解决了社区版 ClickHouse 不能支持高效更新操作的痛点,帮助业务更简单地开发实时分析应用。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2], ... ) ENGINE = HaUniqueMergeTree(shard, replica, version_column) -- 默认为 '/clickhouse/bytehouse/库名.表名/{shard}','{replica}' PARTITION BY toYYYYMM(EventDate) ORDER BY expr [PARTITION BY expr] UNIQUE KEY expr [SAMPLE BY expr] [TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx' [, ...] ] [WHERE conditions] [GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ] ] [SETTINGS name=value, ...]
UNIQUE KEY (product_id, sipHash64(city))
注意
建议 Unique key 设置不超过5个,以避免可能产生的性能影响:
其他的字段设置,如 Order By
,Partition By
等,和 MergeTree 家族的其他引擎的设置规则一致。
参数名 | 常用字段 | 默认值 | 说明 |
---|---|---|---|
partition_level_unique_keys | 是 | 1 | 0:UNIQUE KEY 表粒度唯一 |
enable_unique_partial_update | 是 | 0 | 允许部分列更新写入(需要新引擎版本支持) |
enable_disk_based_unique_key_index | 是 | 1 | 0:in-memory mode。在此方式下,系统会在每张unique表维护一个in-memory key index,因此能支撑的数据量受限于内存 ; |
假设表Schema如下:
-- 引擎默认保证 unique key 在分区内的唯一性 -- 注:UNIQUE KEY 不支持 Nullable CREATE TABLE t1 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id;
插入数据。此时,写入相同 key 的数据可以实现更新(upsert语义)。
INSERT INTO t1 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100); INSERT INTO t1 VALUES ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 4, 400), ('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10001, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10002, 'Beijing', '男装', 2, 200);
查询自动返回每个key最新的数据:
select * from t1 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:40:00 │ 10001 │ Beijing │ 男装 │ 5 │ 500 │ │ 2020-10-29 23:50:00 │ 10002 │ Beijing │ 男装 │ 4 │ 400 │ │ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10001 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
UNIQUE KEY 也可以包含多个字段和表达式,如下面以两个字段:product_id, sipHash64(city)为例:
-- UNIQUE KEY 可以包含多个字段和表达式 CREATE TABLE t1m ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1m/{shard}', '{replica}') PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY (product_id, sipHash64(city)); INSERT INTO t1m VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100), ('2020-10-29 23:50:00', 10002, 'Shanghai', '男装', 4, 400), ('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100); select * from t1m; ┌──────────event_time─┬─product_id─┬─city─────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:40:00 │ 10001 │ Beijing │ 男装 │ 5 │ 500 │ │ 2020-10-29 23:40:00 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-29 23:50:00 │ 10002 │ Shanghai │ 男装 │ 4 │ 400 │ └─────────────────────┴────────────┴──────────┴──────────┴────────┴─────────┘
假设表Schema如下:
CREATE TABLE t2 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('xxxxxxx') PARTITION BY toDate(event_time) --分区字段 ORDER BY (city, category) --排序字段 UNIQUE KEY product_id --唯一键 SETTINGS partition_level_unique_keys = 0; --设置表级别唯一
顺序插入以下测试数据:
INSERT INTO t2 (event_time, product_id, city, category, amount, revenue) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100); INSERT INTO t2 (event_time, product_id, city, category, amount, revenue) VALUES ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 4, 400), ('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10001, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10002, 'Beijing', '男装', 2, 200);
可以看到,10001,10002,10003 这三个产品都更新到了最新数据,且10001,10002 都从 2020-10-29 分区更新到了 2020-10-30 分区。
select * from t2 order by toDate(event_time), product_id; ┌──────event_time─┬product_id─┬─city──┬category─┬amount─┬revenue─┐ │ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10001 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ └─────────────┴───────┴─────┴──────┴─────┴─────┘
默认情况下,相同 unique key 后写入的数据会覆盖已有的数据。这可能会带来以下问题
为了解决上面的问题,HaUniqueMergeTree 支持将表中的某个字段指定为版本字段。引擎保证写入相同 key 的数据时,只有数据版本 >= 已有版本时,才会进行覆盖。版本字段支持所有UInt类型和Data/DateTime,且不能为 Nullable。
说明
使用版本字段时有以下限制:
假设schema如下:
CREATE TABLE t3 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t3/{shard}', '{replica}', event_time) --event_time为版本字段 PARTITION BY toDate(event_time) --分区字段 ORDER BY (city, category) --排序字段 UNIQUE KEY product_id; --唯一键
顺序插入以下数据:
INSERT INTO t3 (event_time, product_id, city, category, amount, revenue) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 8, 800), ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 5, 500);
结果保留后两条。
select * from t3 order by toDate(event_time), product_id; ┌──────event_time─┬product_id─┬─city──┬category─┬amount─┬revenue─┐ │ 2020-10-29 23:50:00 │ 10001 │ Beijing │ 男装 │ 8 │ 800 │ │ 2020-10-29 23:50:00 │ 10002 │ Beijing │ 男装 │ 5 │ 500 │ └─────────────┴───────┴─────┴──────┴─────┴─────┘
若在此时重新导入回溯前两条数据。
INSERT INTO t3 (event_time, product_id, city, category, amount, revenue) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200);
由于版本 < 已有版本,写入时自动跳过。10001 和 10002 的版本没有回退。
select * from t3 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:50:00 │ 10001 │ Beijing │ 男装 │ 8 │ 800 │ │ 2020-10-29 23:50:00 │ 10002 │ Beijing │ 男装 │ 5 │ 500 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
考虑以下Lambda架构的需求场景:
我们可以将分区字段(日期)作为版本字段来实现该场景,然而这需要额外存储一个日期字段。由于分区下所有数据的日期都是一样的,这样做显然存在资源浪费。
为了优化该场景,HaUniqueMergeTree 支持直接使用分区表达式作为版本。当引擎发现版本字段为分区字段时,会自动从元数据中读取版本,避免额外的数据读写。
-- 创建一张按天分区、表粒度唯一的 unique 表,使用分区字段作为版本 CREATE TABLE t4 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t4/{shard}', '{replica}', toDate(event_time)) PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id SETTINGS partition_level_unique_keys = 0; -- 10-29 实时任务数据写入 INSERT INTO t4 VALUES ('2020-10-29 10:00:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 10:00:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 10:10:00', 10001, 'Beijing', '男装', 8, 800), ('2020-10-29 10:10:00', 10002, 'Beijing', '男装', 5, 500); -- 10-30 实时任务数据写入,将 10002 更新到 10-30 分区 INSERT INTO t4 VALUES ('2020-10-30 08:00:00', 10002, 'Beijing', '男装', 10, 1000), ('2020-10-30 08:00:00', 10003, 'Beijing', '男装', 3, 300); -- 10-30 离线任务重写 10-29 数据 INSERT INTO t4 VALUES ('2020-10-29 10:10:00', 10001, 'Beijing', '男装', 7, 700), ('2020-10-29 10:10:00', 10002, 'Beijing', '男装', 5, 500); -- 离线任务只会覆盖 10001 数据,10002 保留 10-30 中的最新数据 select * from t4 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 10:10:00 │ 10001 │ Beijing │ 男装 │ 7 │ 700 │ │ 2020-10-30 08:00:00 │ 10002 │ Beijing │ 男装 │ 10 │ 1000 │ │ 2020-10-30 08:00:00 │ 10003 │ Beijing │ 男装 │ 3 │ 300 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
在某些应用场景下,用户希望在INSERT时加上一个字段来标识是否删除来扩展INSERT语义。
在HaUniqueMergeTree引擎中,为每张表都添加了一个保留字段_delete_flag_
,可在 INSERT / INSERT SELECT 时指定,其类型为UInt8
, 0 表示数据写入,非 0 表示数据删除。
需要注意的是,_delete_flag_
字段仅可在 INSERT / INSERT SELECT 或者创建物化视图时指定,不可以在 CREATE TABLE 时指定,也不可查询该字段。
说明
删除行功能可以在对底表为HaUniqueMergeTree的分布式表进行操作,前提是分布式表需要配置参数remote_table_is_ha_unique = 1。
用法示例如下:
假设schema如下:
CREATE TABLE t1 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64, ) ENGINE = HaUniqueMergeTree(xxxx) PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id; --唯一键
导入以下数据:
注:需要选择单个节点,并插入本地表来使用 delete flag 做数据删除
INSERT INTO t1_local (event_time, product_id, city, category, amount, revenue, _delete_flag_) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500, 0), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200, 0), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100, 0), ('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 4, 400, 5), ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 2, 200, 1), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100, 0);
查询结果中包含了新加入的一行数据,并删除了两行旧数据:
select * from t1 order by toDate(event_time), product_id; ┌──────event_time─┬─product_id┬─city──┬─category┬amount─┬revenue─┐ │ 2020-10-29 23:40:00 │ 10003 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ └─────────────┴───────┴─────┴──────┴─────┴─────┘
针对某个 where 条件对多行数据进行批量删除的方式如下:
insert into `t1_local` (*, _delete_flag_) select *, 1 as _delete_flag_ from `t1_local` where product_id=10001;
使用条件:
行更新模式:缺省列采用默认值填充
部分列更新模式:缺省列如果有原值则保留,否则填充默认值
unique表指定变量enable_unique_partial_update = 1后允许写入以部分列更新模式进行,默认关闭。
部分列写入模式当且仅当unique表开启了部分列更新功能才有效,否则等效为行更新模式:
类型的默认值
部分列更新写入模式时如果表包含版本字段
列更新规则
方式一:按功能列_update_columns_(String类型)区分更新列方案的列更新规则
CREATE TABLE t1 ( k Int32, c1 Int32, c2 Nullable(Float64), c3 Nullable(String), c4 Nullable(Int64), m1 Map(String, Int32), a1 Array(String)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 1, partial_update_enable_merge_map = 0; SET enable_unique_partial_update = 1; INSERT INTO t1 (k, c1, c2, m1, a1) VALUES (1, 10, 3.14, {'k1':1}, ['hello']); -- 此时解析时会填充_update_columns_为'k,c1,c2,m1,a1' ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k1':1} │ ['hello'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c2, m1, a1, _update_columns_) VALUES (1, 20, 31.4, {'k2':2}, ['world'], 'k,c1,m1,a1'); ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 20 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k2':2} │ ['world'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c3, m1) VALUES (1, 0, 'foo', {'k3':3}); -- 此时解析是会填充_update_columns_为'k,c1,c3,m1' -- 等价于INSERT INTO t1 (k, c1, c3, m1, _update_columns_) VALUES (1, 0, 'foo', {'k2':2}, 'k, c1, c3, m1'); -- 此时c1会强制更新为0,c3强制更新为'foo',m1强制更新为'k3':3 ┌─k─┬─c1─┬───c2─┬─c3──┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 0 │ 3.14 │ foo │ ᴺᵁᴸᴸ │ {'k3':3} │ ['world'] │ └───┴────┴──────┴─────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c2, c3, c4, m1, a1, _update_columns_) VALUES (1, 10, 31.4, 'goo', 15, {'k4': 4}, ['hello', 'world'], ''); -- _update_columns_为空时表示更新所有列 ┌─k─┬─c1─┬───c2─┬─c3──┬─c4─┬─m1───────┬─a1────────────────┐ │ 1 │ 10 │ 31.4 │ goo │ 15 │ {'k4':4} │ ['hello','world'] │ └───┴────┴──────┴─────┴────┴──────────┴───────────────────┘
方式二:按类型默认值区分更新列方案的列更新规则
CREATE TABLE t1 ( k Int32, c1 Int32, c2 Nullable(Float64), c3 Nullable(String), c4 Nullable(Int64), m1 Map(String, Int32), a1 Array(String)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 0, partial_update_enable_merge_map = 1; SET enable_unique_partial_update = 1; INSERT INTO t1 (k, c1, c2, m1, a1) VALUES (1, 10, 3.14, {'k1':1}, ['hello']); ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k1':1} │ ['hello'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c3, m1) VALUES (1, 0, 'foo', {'k2':2}); -- 对于未指定字段 -- - c1为非组合类型,但写入的是默认值,因此保留旧值 -- - c2/c4为Nullable类型,保留旧值 -- - a1为Array空值,保留旧值 -- 对于m1,更新m1{'k2'},保留m1{'k1'} ┌─k─┬─c1─┬───c2─┬─c3──┬───c4─┬─m1──────────────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ foo │ ᴺᵁᴸᴸ │ {'k1':1,'k2':2} │ ['hello'] │ └───┴────┴──────┴─────┴──────┴─────────────────┴───────────┘ INSERT INTO t1 (k, c1, c2, c3, c4, m1, a1) VALUES (1, 20, null, 'bar', 30, {'k2':0, 'k3':3}, ['world']); -- c1不是默认值,更新 -- c2为null,保留旧值;c3/c4不为null,更新 -- 更新m1{'k2'}, m1{'k3'},保留m1{'k1'} -- a1不为空值,更新 ┌─k─┬─c1─┬───c2─┬─c3──┬─c4─┬─m1─────────────────────┬─a1────────┐ │ 1 │ 20 │ 3.14 │ bar │ 30 │ {'k1':1,'k2':0,'k3':3} │ ['world'] │ └ ───┴────┴──────┴─────┴────┴────────────────────────┴───────────┘
【注意】使用这种方式在建表时 慎用默认值!
在建表时可以在字段后指定默认值,默认值可以为表达式。如果用户指定了默认值,那么写入时对于缺省的列会按照建表时指定的方式进行填充,而部分列更新判断是否为默认值时是按照引擎内数据类型的默认值进行判断,因此可能会产生不符合预期的行为。下面将举个例子进行说明
CREATE TABLE t1 ( k Int32, c1 Int32 DEFAULT k + 1, c2 Nullable(Float64)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 0; insert into t1 values (1, 10, 3.14); ┌─k─┬─c1─┬───c2─┐ │ 1 │ 10 │ 3.14 │ └───┴────┴──────┘ insert into t1 (k, c2) values (1, 31.4); ┌─k─┬─c1─┬───c2─┐ │ 1 │ 2 │ 31.4 │ └───┴────┴──────┘ -- 如果在建表时没有指定从c1的默认值,那么这条写入的语义是将k=1的那条数据的c2被更新为31.4,但是由于建表指定了默认值,等价于insert into t1 values (1, 2,31.4),因此c1也被更新为了2。这种情况下为了达到仅更新c2的目的,可以有以下两种方式:1. 建表时不要使用默认值;2. 显示指定默认值,即insert into t1 values (1, 0,31.4) -- ┌─k─┬─c1─┬───c2─┐ -- │ 1 │ 10 │ 31.4 │ -- └───┴────┴──────┘
在使用 in-memory 模式下,HaUniqueMergeTree 的导入性能约是 HaMergeTree 或 社区的 MergeTree 引擎的一半,为 10k rows/s/shard,或 20 MB/s。性能相比社区的 ReplacingMergeTree 也接近一致。
但查询性能上,HaUniqueMergeTree 的性能和 HaMergeTree 或 MergeTree 引擎一致。
在使用 disk-based 模式下,HaUniqueMergeTree 的导入性能约是 HaMergeTree 或 社区的 MergeTree 引擎的 30-40%,为 6-8k rows/s/shard,或 12-16 MB/s。查询性能依旧不变。
目前unique 表有两种key index使用方式:in-memory 和 disk-based。由表级参数enable_disk_based_unique_key_index
控制。
如果包含很多字段,考虑使用哈希值。例:UNIQUE KEY sipHash64(val1,val2,val3....)
优先使用默认的分区级别唯一。
如果业务场景必须使用表级别唯一,考虑设置 TTL、采用更粗粒度的分区(例如按月分区)等方式减少分区数量。
如果需要指定版本字段,优先考虑分区值作为版本,减少内存占用。
Consumer 会直接写本地 shard。因此