部分列更新,主要是指直接更新表中某些字段值,而不是全部的字段值。Flink + ByteHouse-CDW 的 Partial Update 是一种用于流式数据处理的表引擎功能,旨在通过多流并发更新同一张表的部分列,最终实现完整数据的更新。本文是关于这项功能其功能、特点及适用场景的详细介绍:
部分列更新的功能包含如下方面:
以下是常见的部分列更新的适用场景:
在使用之前,需要注意 Flink 版本要求:请确保 Flink 使用 1.16 以上版本,并且 ByteHouse-CDW Connector 在1.27.109 以上版本。
以下是一个使用 Partial Update 的示例,例如我们有构建用户主数据的公共维表逻辑。上游有三张用户相关的信息表:
而我们在做业务分层的时候,可能会需要构建以客户为中心(客户id主键)的主数据,包含用户的各方面维度信息,以支持各项业务查询处理。
我们根据业务描述,希望按照如下模式进行业务建模,使用 ByteHouse-CDW 建表语句创建部分更新表如下:
-- 引擎默认保证 unique key 在分区内的唯一性
-- 注:UNIQUE KEY 不支持 Nullable
CREATE TABLE partial_update_table (
-- 主键
k Int32,
-- 基本信息
userid Nullable(Int64),
username Nullable(String),
-- 客服信息
csmid Nullable(Int64),
csmname Nullable(String)
) ENGINE = CnchMergeTree
UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1,
partial_update_enable_merge_map = 0;
关键设计:
enable_unique_partial_update
:设置此值为 1 ,代表开启部分列更新。partial_update_enable_merge_map
:设置值为 0 ,代表不开启 Map 的合并。以下 Flink SQL 从两个个数据源各自读取部分列,将其他的不修改的数据从 Sink 中去掉,这样可以保证每个数据流都只更新部分数据,最终达到部分更新下游宽表的效果:
CREATE TABLE `bh_source1` (
k INT,
userid BIGINT,
username STRING,
PRIMARY KEY (`k`) NOT ENFORCED
) WITH (
-- 这里根据实际存储情况添加对应的连接配置等信息,以下以Kafka为例简单示意,实际中需要补充完整
'connector' = 'datagen'
);
CREATE TABLE `bh_source2` (
k INT,
csmid BIGINT,
csmname STRING,
PRIMARY KEY (`k`) NOT ENFORCED
) WITH (
-- 这里根据实际存储情况添加对应的连接配置等信息,以下以Kafka为例简单示意,实际中需要补充完整
'connector' = 'datagen'
);
-- 定义结果表
-- 先将结果表各个部分列插入的公共字段部分定义在这里
-- 一般会包含主键及写入的公共字段
CREATE TABLE `bh_sink_base` (
k INT,
PRIMARY KEY (`k`) NOT ENFORCED
) WITH (
'connector' = 'bytehouse-cdw',
'jdbc.enable-gateway-connection' = 'true',
'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com',
'bytehouse.gateway.port' = '19000',
'bytehouse.gateway.api-token' = '<< your API token >>',
'bytehouse.gateway.virtual-warehouse' = 'default_vw',
'database' = 'demo_database',
'table-name' = 'demo_table'
);
-- 定义第一个部分列更新流需要写入或者更新的字段
-- 这里继承了之前定义 bh_sink_base 表的所有字段和属性
CREATE TABLE `bh_de_sink1` (
userid BIGINT,
username STRING,
) LIKE sink_base (
INCLUDING ALL
OVERWRITING OPTIONS
);
-- 定义第二个部分列更新流需要写入或者更新的字段
-- 这里同样继承了之前定义 bh_sink_base 表的所有字段和属性
CREATE TABLE `bh_de_sink2` (
csmid BIGINT,
csmname STRING,
) LIKE sink_base (
INCLUDING ALL
OVERWRITING OPTIONS
);
-- 部分列更新
INSERT INTO `bh_de_sink1`
SELECT `k`, `userid`, `username`
FROM `bh_source1`;
INSERT INTO `bh_de_sink2` (
SELECT `k`, `csmid`, `csmname`
FROM `bh_source2`;
问题现象:当部分列更新选择数组、Map 的合并时,如果上游发生删除等操作?ByteHouse-CDW 表是否会发生数组内元素、Map 内元素的删除?
解答:截至目前 ByteHouse-CDW 暂时还不支持部分列更新的回撤能力。如果需要妥善处理删除、更新等操作,建议在上游 Flink 任务中处理后直接写入下游表字段中。