部分列更新,主要是指直接更新表中某些字段值,而不是全部的字段值。可以采用 Update 语句来进行更新,这种 Update 语句一般采用先将整行数据读出,然后再更新部分字段值,再写回。这种语义在大量数据实时入湖场景非常低效。
Flink Paimon 的 Partial Update 是一种用于流式数据处理的表引擎功能,旨在通过多流并发更新同一张表的部分列,最终实现完整数据的更新。本文是关于这项功能其功能、特点及适用场景的详细介绍:
部分列更新的功能包含如下方面:
以下是常见的部分列更新的适用场景:
以下是一个使用 Partial Update 的示例,例如我们有构建用户主数据的公共维表逻辑。上游有三张用户相关的信息表:
而我们在做业务分层的时候,可能会需要构建以用户为中心的主数据,包含用户的各方面维度信息,以支持各项下游流、批任务的后续处理。
此时上游的三张表的结构,经过简化如下(采用模拟数据源,实际上可能是 Kafka、MySQL-CDC、Paimon 等多种数据上游):
-- 用户基础数据 CREATE TABLE `paimon_pu`.`paimon_pu`.`user_info` ( uid INT, username STRING, -- 用户名 reg_time TIMESTAMP(3), -- 注册时间 PRIMARY KEY (uid) NOT ENFORCED ); -- 用户登录渠道 CREATE TABLE `paimon_pu`.`paimon_pu`.`user_logintype` ( uid INT, logintype STRING, -- 渠道类型,如 github/google ... bind_time TIMESTAMP(3), PRIMARY KEY (uid, logintype) NOT ENFORCED ); -- 用户会员信息 CREATE TABLE `paimon_pu`.`paimon_pu`.`user_vip_info` ( uid INT, is_valid BOOLEAN, -- 会员是否有效 start_time TIMESTAMP(3), -- 会员起始时间 end_time TIMESTAMP(3), -- 会员结束时间 PRIMARY KEY (uid) NOT ENFORCED );
可以用如下的测试脚本,插入测试数据:
-- 插入用户基础数据 INSERT INTO `paimon_pu`.`paimon_pu`.`user_info` VALUES (1001, 'Alice', TIMESTAMP '2023-01-15 10:30:00'), (1002, 'Bob', TIMESTAMP '2023-03-22 14:20:00'), (1003, 'Charlie', TIMESTAMP '2023-05-18 09:15:00'), (1004, 'David', TIMESTAMP '2023-07-01 16:45:00'), (1005, 'Emma', TIMESTAMP '2023-09-30 11:25:00'); -- 插入用户登录渠道数据(一个用户可能有多个登录渠道) INSERT INTO `paimon_pu`.`paimon_pu`.`user_logintype` VALUES (1001, 'github', TIMESTAMP '2023-01-15 10:35:00'), (1001, 'google', TIMESTAMP '2023-01-16 15:20:00'), (1002, 'wechat', TIMESTAMP '2023-03-22 14:25:00'), (1003, 'github', TIMESTAMP '2023-05-18 09:20:00'), (1003, 'google', TIMESTAMP '2023-05-19 10:30:00'), (1004, 'wechat', TIMESTAMP '2023-07-01 16:50:00'), (1005, 'github', TIMESTAMP '2023-09-30 11:30:00'); -- 插入用户会员信息 INSERT INTO `paimon_pu`.`paimon_pu`.`user_vip_info` VALUES (1001, true, TIMESTAMP '2023-02-01 00:00:00', TIMESTAMP '2023-12-31 23:59:59'), (1002, true, TIMESTAMP '2023-04-01 00:00:00', TIMESTAMP '2023-10-31 23:59:59'), (1003, true, TIMESTAMP '2023-06-01 00:00:00', TIMESTAMP '2024-05-31 23:59:59'), (1005, true, TIMESTAMP '2023-10-01 00:00:00', TIMESTAMP '2024-09-30 23:59:59');
我们根据业务描述,希望按照如下模式进行业务建模:
-- 用户宽表,包含以上三个表的所有数据作为公共维度表 CREATE TABLE IF NOT EXISTS `paimon_pu`.`paimon_pu`.`ods_user_wide_info` ( uid INT, -- 用户基础信息 username STRING, -- 用户名 reg_time TIMESTAMP(3), -- 注册时间 -- 用户渠道信息,嵌套结构保存多种登录渠道,以渠道名去重 logintypes ARRAY<ROW<logintype STRING, bind_time TIMESTAMP(3)>>, last_bind_time TIMESTAMP(3), -- 会员信息 vip_is_valid BOOLEAN, -- 会员是否有效 vip_start_time TIMESTAMP(3), -- 会员起始时间 vip_end_time TIMESTAMP(3), -- 会员结束时间 PRIMARY KEY (uid) NOT ENFORCED ) WITH ( 'merge-engine'='partial-update', -- 如果需要下游的流读,需要以 lookup 或者 full-compaction 'changelog-producer' = 'lookup', 'fields.last_bind_time.sequence-group' = 'logintypes', 'fields.logintypes.aggregate-function' = 'nested_update', 'fields.logintypes.nested-key' = 'logintype' );
关键设计:
merge-engine
:采用 partial-update
sequence-group
:采用 last_login_time
作为序列组字段,如果这个时间戳版本增加的话,会对 logintypes
字段进行聚合aggregate-function
:聚合函数采用 nested_update
在子字段中,对 logintype
作为嵌套主键,进行去重更新这样就可以达到当用户绑定了多种渠道之后,会将用户的绑定渠道作为数组置于嵌套字段 logintypes
内。
以下 Flink SQL 从三个数据源各自读取部分列,将其他的不修改的数据置为 NULL,这样可以保证每个数据流都只更新部分数据,最后将三个数据流 UNION ALL 起来,变成一个数据流写入下游 Partial Update 表中:
INSERT INTO `paimon_pu`.`paimon_pu`.`ods_user_wide_info` SELECT uid, username, reg_time, logintypes, last_bind_time, vip_is_valid, vip_start_time, vip_end_time FROM ( SELECT uid, username, reg_time, CAST (NULL AS ARRAY<ROW<logintype STRING, bind_time TIMESTAMP(3)>>) AS logintypes, CAST (NULL AS TIMESTAMP(3)) AS last_bind_time, CAST (NULL AS BOOLEAN) AS vip_is_valid, CAST (NULL AS TIMESTAMP(3)) AS vip_start_time, CAST (NULL AS TIMESTAMP(3)) AS vip_end_time FROM `user_info` UNION ALL SELECT uid, CAST (NULL AS STRING) AS username, CAST (NULL AS TIMESTAMP(3)) AS reg_time, ARRAY[ROW(logintype, bind_time)] AS logintypes, bind_time AS last_bind_time, CAST (NULL AS BOOLEAN) AS vip_is_valid, CAST (NULL AS TIMESTAMP(3)) AS vip_start_time, CAST (NULL AS TIMESTAMP(3)) AS vip_end_time FROM `user_logintype` UNION ALL SELECT uid, CAST (NULL AS STRING) AS username, CAST (NULL AS TIMESTAMP(3)) AS reg_time, CAST (NULL AS ARRAY<ROW<logintype VARCHAR, bind_time TIMESTAMP(3)>>) AS logintypes, CAST (NULL AS TIMESTAMP(3)) AS last_bind_time, is_valid AS vip_is_valid, start_time AS vip_start_time, end_time AS vip_end_time FROM `user_vip_info` );
可以使用如下的类似 SQL 对数据进行探索查询,需要注意的是此类 SQL 不能直接在 Flink SQL 任务中调试运行,需要写相关的 Insert 语句到 print 的数据下游才能正常调试。
-- 用户宽表,包含以上三个表的所有数据作为公共维度表 CREATE TABLE IF NOT EXISTS `print_result` ( uid INT, -- 用户基础信息 username STRING, -- 用户名 reg_time TIMESTAMP(3), -- 注册时间 -- 用户渠道信息,嵌套结构保存多种登录渠道,以渠道名去重 logintype STRING, bind_time TIMESTAMP(3), -- 会员信息 vip_is_valid BOOLEAN, -- 会员是否有效 vip_start_time TIMESTAMP(3), -- 会员起始时间 vip_end_time TIMESTAMP(3), -- 会员结束时间 PRIMARY KEY (uid) NOT ENFORCED ) WITH ( 'connector'='print' ); -- 使用 UNNEST 将数组解嵌套 -- 注意:当前 Flink SQL 不支持直接运行此类查询 SQL,需要 insert 到 print sink 中 INSERT INTO `print_result` SELECT uid, username, reg_time, logintype, bind_time, vip_is_valid, vip_start_time, vip_end_time FROM `paimon_pu`.`paimon_pu`.`ods_user_wide_info`, UNNEST(logintypes) AS lt(logintype, bind_time);
查询结果可以看到批作业处理成功,输出内容如下:
通过指定 'merge-engine' = 'partial-update'
,用户可以通过多次更新来逐步完成记录的列更新。这是通过使用相同主键下的最新数据逐个更新值字段来实现的。然而,空值在这个过程中不会被覆盖。例如,假设 Paimon 接收到三条记录:
<1, 23.0, 10, NULL>
<1, NULL, NULL, 'This is a book'>
<1, 25.2, NULL, NULL>
假设第一列是主键,最终结果将是 <1, 25.2, 10, 'This is a book'>
。
对于流查询,partial-update
合并引擎必须与 lookup
或 full-compaction
(参考 Changelog 产出机制)一起使用。值得注意的是,input
也支持产出 Changelog,但只返回输入记录,不能返回更新后的数据。
另外,默认情况下,部分更新不能接受删除记录,你可以选择以下解决方案之一:
序列字段可能无法解决具有多个流更新的部分更新表的无序问题,因为在多流更新期间,序列字段可能会被另一个流的最新数据覆盖。因此,我们为部分更新表引入了序列组机制。它可以解决:
请参见示例:
CREATE TABLE t ( k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d' ); INSERT INTO t VALUES (1, 1, 1, 1, 1, 1, 1); -- g_2 为空,c, d 不应更新 INSERT INTO t VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT)); SELECT * FROM t; -- 输出 1, 2, 2, 2, 1, 1, 1 -- g_1 较小,a, b 不应更新 INSERT INTO t VALUES (1, 3, 3, 1, 3, 3, 3); -- 输出 1, 2, 2, 2, 3, 3, 3
对于 fields.<field-name>.sequence-group
,有效的比较数据类型包括:DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP 和 TIMESTAMP_LTZ。
你可以为输入字段指定聚合函数,所有在 Paimon 聚合表功能 中支持的函数都可以使用。请参见示例:
CREATE TABLE t ( k INT, a INT, b INT, c INT, d INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'merge-engine'='partial-update', 'fields.a.sequence-group' = 'b', 'fields.b.aggregate-function' = 'first_value', 'fields.c.sequence-group' = 'd', 'fields.d.aggregate-function' = 'sum' ); -- 输入 1,1,1,null,null --> b 字段 first_value 更新为 1 INSERT INTO t VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT)); -- 输入 1,null,null,1,1 --> d 字段聚合值为 1 INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1); -- 输入 1,2,2,null,null --> b 字段 first_value 不更新 INSERT INTO t VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT)); -- 输入 1,null,null,2,2 --> d 字段聚合值为 3 INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2); -- 表最终结果输出 1, 2, 1, 2, 3
Flink Paimon 的 Partial Update 是一种高效的多流并发更新机制,适用于宽表构建、实时数据打宽、流批一体等场景。其核心优势在于避免了全量 Join 操作,减少了状态压力,同时通过序列组机制解决了多流更新的乱序问题。对于需要高效更新和打宽数据的业务场景,可以尝试使用 Partial Update 功能。另外,更多功能也请关注 Paimon 官方文档。
问题现象:在流式任务消费上游的部分列更新表的时候报错如下,java.lang.RuntimeException: Partial update streaming reading is not supported. You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading. ('input' changelog producer is also supported, but only returns input records.)
解决方案:部分列更新表如果没有产出 changelog,是无法进行流式读取的。建议使用 lookup
或者full-compaction
两种模式进行变更日志订阅。另外如果使用input
的产出模式的话,一般是无法获得真正的变更日志,这种情况下获得的是上游的插入原始内容。