You need to enable JavaScript to run this app.
导航
Paimon 部分列更新功能
最近更新时间:2025.02.18 16:04:10首次发布时间:2025.02.18 16:04:10

1. 概述

部分列更新,主要是指直接更新表中某些字段值,而不是全部的字段值。可以采用 Update 语句来进行更新,这种 Update 语句一般采用先将整行数据读出,然后再更新部分字段值,再写回。这种语义在大量数据实时入湖场景非常低效。
Flink Paimon 的 Partial Update 是一种用于流式数据处理的表引擎功能,旨在通过多流并发更新同一张表的部分列,最终实现完整数据的更新。本文是关于这项功能其功能、特点及适用场景的详细介绍:

Image

部分列更新的功能包含如下方面:

  1. 部分列更新:Partial Update 允许多个 Flink 流任务并发更新同一张表的不同列,最终将这些部分更新合并为一行完整的数据。例如,流 A 更新列 A 和列 B,流 B 更新列 C 和列 D,最终生成一行包含所有列的数据。
  2. 多流并发写入:多个 Flink 流任务可以通过 UNION ALL 的方式合并为一个 Job,写入同一张 Paimon 表,避免因多 Job 并发写入导致的 Compaction 任务拆分问题。
  3. 无需全量 Join:传统的宽表构建通常需要通过双流或多流 Join 实现,而 Partial Update 避免了 Join 操作,减少了 Flink 任务的状态压力,简化了任务维护。
  4. 支持 Changelog 订阅:支持与 Lookup 或 Full-Compaction 的 Changelog Producer 结合使用,获取完整的数据变更日志。
  5. 支持序列组(Sequence Group):通过序列组机制,Partial Update 可以解决多流更新时的乱序问题。每个流可以定义自己的序列组,确保只有符合条件的更新才会生效。

2. 适用场景

以下是常见的部分列更新的适用场景:

  1. 多流实时动态更新:适用于需要频繁更新某些字段的场景,例如用户标签表中的行为信息更新,能够支持广告或推荐系统的实时分析和决策。
  2. 大宽表拼接:在将多张源表的数据合并成一张大宽表时,可以通过部分列更新来实现,避免全量 Join 操作带来的性能开销。例如,电商平台中需要将订单表、支付表、商品表等数据合并为一张宽表,供下游分析使用。
  3. 数据修正:适用于需要修正某些数据的场景,部分列更新能够精准定位并更新目标字段,有效减少更新开销。
  4. 高频并发写入:支持高频的并发写入,适用于需要实时更新大量行但仅涉及少数列的场景,例如实时日志处理或监控数据更新。
  5. 性能优化:在更新少数列时,部分列更新可以显著提升性能,尤其是在涉及大量行的情况下,能够减少不必要的计算和存储开销。

3. 使用示例

3.1 业务说明

以下是一个使用 Partial Update 的示例,例如我们有构建用户主数据的公共维表逻辑。上游有三张用户相关的信息表:

  1. 用户基础信息:用户注册后的用户名、基本信息等内容,为用户表的主要信息
  2. 用户渠道信息:用户登录的渠道,比如 github / google account / phone ... 等,一个用户可能会绑定多个渠道登录方式
  3. 用户会员信息:用户注册后如果注册会员,则会有会员信息表,一个用户可能会有1个或者0个会员信息

而我们在做业务分层的时候,可能会需要构建以用户为中心的主数据,包含用户的各方面维度信息,以支持各项下游流、批任务的后续处理。

Image

此时上游的三张表的结构,经过简化如下(采用模拟数据源,实际上可能是 Kafka、MySQL-CDC、Paimon 等多种数据上游):

-- 用户基础数据
CREATE TABLE `paimon_pu`.`paimon_pu`.`user_info` (
    uid INT,
    username STRING, -- 用户名
    reg_time TIMESTAMP(3), -- 注册时间
    PRIMARY KEY (uid) NOT ENFORCED
);
-- 用户登录渠道
CREATE TABLE `paimon_pu`.`paimon_pu`.`user_logintype` (
    uid INT,
    logintype STRING, -- 渠道类型,如 github/google ...
    bind_time TIMESTAMP(3),
    PRIMARY KEY (uid, logintype) NOT ENFORCED
);
-- 用户会员信息
CREATE TABLE `paimon_pu`.`paimon_pu`.`user_vip_info` (
    uid INT,
    is_valid BOOLEAN, -- 会员是否有效
    start_time TIMESTAMP(3), -- 会员起始时间
    end_time TIMESTAMP(3), -- 会员结束时间
    PRIMARY KEY (uid) NOT ENFORCED
);

可以用如下的测试脚本,插入测试数据:

-- 插入用户基础数据
INSERT INTO `paimon_pu`.`paimon_pu`.`user_info` VALUES 
(1001, 'Alice', TIMESTAMP '2023-01-15 10:30:00'),
(1002, 'Bob', TIMESTAMP '2023-03-22 14:20:00'),
(1003, 'Charlie', TIMESTAMP '2023-05-18 09:15:00'),
(1004, 'David', TIMESTAMP '2023-07-01 16:45:00'),
(1005, 'Emma', TIMESTAMP '2023-09-30 11:25:00');

-- 插入用户登录渠道数据(一个用户可能有多个登录渠道)
INSERT INTO `paimon_pu`.`paimon_pu`.`user_logintype` VALUES 
(1001, 'github', TIMESTAMP '2023-01-15 10:35:00'),
(1001, 'google', TIMESTAMP '2023-01-16 15:20:00'),
(1002, 'wechat', TIMESTAMP '2023-03-22 14:25:00'),
(1003, 'github', TIMESTAMP '2023-05-18 09:20:00'),
(1003, 'google', TIMESTAMP '2023-05-19 10:30:00'),
(1004, 'wechat', TIMESTAMP '2023-07-01 16:50:00'),
(1005, 'github', TIMESTAMP '2023-09-30 11:30:00');

-- 插入用户会员信息
INSERT INTO `paimon_pu`.`paimon_pu`.`user_vip_info` VALUES 
(1001, true, TIMESTAMP '2023-02-01 00:00:00', TIMESTAMP '2023-12-31 23:59:59'),
(1002, true, TIMESTAMP '2023-04-01 00:00:00', TIMESTAMP '2023-10-31 23:59:59'),
(1003, true, TIMESTAMP '2023-06-01 00:00:00', TIMESTAMP '2024-05-31 23:59:59'),
(1005, true, TIMESTAMP '2023-10-01 00:00:00', TIMESTAMP '2024-09-30 23:59:59');

3.2 业务建模

我们根据业务描述,希望按照如下模式进行业务建模:

-- 用户宽表,包含以上三个表的所有数据作为公共维度表
CREATE TABLE IF NOT EXISTS `paimon_pu`.`paimon_pu`.`ods_user_wide_info` (
    uid INT,
    -- 用户基础信息
    username STRING, -- 用户名
    reg_time TIMESTAMP(3), -- 注册时间
    -- 用户渠道信息,嵌套结构保存多种登录渠道,以渠道名去重
    logintypes ARRAY<ROW<logintype STRING, bind_time TIMESTAMP(3)>>,
    last_bind_time TIMESTAMP(3), 
    -- 会员信息
    vip_is_valid BOOLEAN, -- 会员是否有效
    vip_start_time TIMESTAMP(3), -- 会员起始时间
    vip_end_time TIMESTAMP(3), -- 会员结束时间
    PRIMARY KEY (uid) NOT ENFORCED
) WITH (
    'merge-engine'='partial-update',
    -- 如果需要下游的流读,需要以 lookup 或者 full-compaction 
    'changelog-producer' = 'lookup', 
    'fields.last_bind_time.sequence-group' = 'logintypes',
    'fields.logintypes.aggregate-function' = 'nested_update',
    'fields.logintypes.nested-key' = 'logintype'
);

关键设计:

  1. merge-engine:采用 partial-update
  2. sequence-group:采用 last_login_time 作为序列组字段,如果这个时间戳版本增加的话,会对 logintypes字段进行聚合
  3. aggregate-function:聚合函数采用 nested_update在子字段中,对 logintype作为嵌套主键,进行去重更新

这样就可以达到当用户绑定了多种渠道之后,会将用户的绑定渠道作为数组置于嵌套字段 logintypes内。

3.3 部分列更新

以下 Flink SQL 从三个数据源各自读取部分列,将其他的不修改的数据置为 NULL,这样可以保证每个数据流都只更新部分数据,最后将三个数据流 UNION ALL 起来,变成一个数据流写入下游 Partial Update 表中:

INSERT INTO `paimon_pu`.`paimon_pu`.`ods_user_wide_info`
SELECT
  uid, username, reg_time, logintypes, last_bind_time, vip_is_valid, vip_start_time, vip_end_time
FROM (
    SELECT 
      uid, 
      username,
      reg_time, 
      CAST (NULL AS ARRAY<ROW<logintype STRING, bind_time TIMESTAMP(3)>>) AS logintypes,
      CAST (NULL AS TIMESTAMP(3)) AS last_bind_time,
      CAST (NULL AS BOOLEAN) AS vip_is_valid,
      CAST (NULL AS TIMESTAMP(3)) AS vip_start_time,
      CAST (NULL AS TIMESTAMP(3)) AS vip_end_time
    FROM `user_info`
    
    UNION ALL 
    
    SELECT 
      uid, 
      CAST (NULL AS STRING) AS username,
      CAST (NULL AS TIMESTAMP(3)) AS reg_time,
      ARRAY[ROW(logintype, bind_time)] AS logintypes, 
      bind_time AS last_bind_time,
      CAST (NULL AS BOOLEAN) AS vip_is_valid,
      CAST (NULL AS TIMESTAMP(3)) AS vip_start_time,
      CAST (NULL AS TIMESTAMP(3)) AS vip_end_time
    FROM `user_logintype`
    
    UNION ALL 
      
    SELECT 
      uid, 
      CAST (NULL AS STRING) AS username,
      CAST (NULL AS TIMESTAMP(3)) AS reg_time,
      CAST (NULL AS ARRAY<ROW<logintype VARCHAR, bind_time TIMESTAMP(3)>>) AS logintypes,
      CAST (NULL AS TIMESTAMP(3)) AS last_bind_time,
      is_valid AS vip_is_valid,
      start_time AS vip_start_time,
      end_time AS vip_end_time
    FROM `user_vip_info`
);

3.4 结果验证

可以使用如下的类似 SQL 对数据进行探索查询,需要注意的是此类 SQL 不能直接在 Flink SQL 任务中调试运行,需要写相关的 Insert 语句到 print 的数据下游才能正常调试。

-- 用户宽表,包含以上三个表的所有数据作为公共维度表
CREATE TABLE IF NOT EXISTS `print_result` (
    uid INT,
    -- 用户基础信息
    username STRING, -- 用户名
    reg_time TIMESTAMP(3), -- 注册时间
    -- 用户渠道信息,嵌套结构保存多种登录渠道,以渠道名去重
    logintype STRING,
    bind_time TIMESTAMP(3),
    -- 会员信息
    vip_is_valid BOOLEAN, -- 会员是否有效
    vip_start_time TIMESTAMP(3), -- 会员起始时间
    vip_end_time TIMESTAMP(3), -- 会员结束时间
    PRIMARY KEY (uid) NOT ENFORCED
) WITH (
    'connector'='print'
);

-- 使用 UNNEST 将数组解嵌套
-- 注意:当前 Flink SQL 不支持直接运行此类查询 SQL,需要 insert 到 print sink 中
INSERT INTO `print_result` 
SELECT uid, username, reg_time, logintype, bind_time, vip_is_valid,  vip_start_time, vip_end_time
FROM `paimon_pu`.`paimon_pu`.`ods_user_wide_info`, UNNEST(logintypes) AS lt(logintype, bind_time);

查询结果可以看到批作业处理成功,输出内容如下:
Image

4. 功能详解

4.1 功能用法

通过指定 'merge-engine' = 'partial-update',用户可以通过多次更新来逐步完成记录的列更新。这是通过使用相同主键下的最新数据逐个更新值字段来实现的。然而,空值在这个过程中不会被覆盖。例如,假设 Paimon 接收到三条记录:

  • <1, 23.0, 10, NULL>
  • <1, NULL, NULL, 'This is a book'>
  • <1, 25.2, NULL, NULL>

假设第一列是主键,最终结果将是 <1, 25.2, 10, 'This is a book'>

对于流查询,partial-update 合并引擎必须与 lookupfull-compaction (参考 Changelog 产出机制)一起使用。值得注意的是,input 也支持产出 Changelog,但只返回输入记录,不能返回更新后的数据。

另外,默认情况下,部分更新不能接受删除记录,你可以选择以下解决方案之一:

  • 配置 'ignore-delete' 来忽略删除记录。
  • 配置 'sequence-group' 来撤销部分列。

4.2 序列组(Sequence-Group)

序列字段可能无法解决具有多个流更新的部分更新表的无序问题,因为在多流更新期间,序列字段可能会被另一个流的最新数据覆盖。因此,我们为部分更新表引入了序列组机制。它可以解决:

  1. 多流更新期间的无序问题。每个流定义其自己的序列组。
  2. 真正的部分更新,而不仅仅是非空更新,比如需要支持 Null 值覆盖的场景。

请参见示例:

CREATE TABLE t (
    k INT,
    a INT,
    b INT,
    g_1 INT,
    c INT,
    d INT,
    g_2 INT,
    PRIMARY KEY (k) NOT ENFORCED
) WITH (
    'merge-engine'='partial-update',
    'fields.g_1.sequence-group'='a,b',
    'fields.g_2.sequence-group'='c,d'
);

INSERT INTO t VALUES (1, 1, 1, 1, 1, 1, 1);

-- g_2 为空,c, d 不应更新
INSERT INTO t VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));

SELECT * FROM t; -- 输出 1, 2, 2, 2, 1, 1, 1

-- g_1 较小,a, b 不应更新
INSERT INTO t VALUES (1, 3, 3, 1, 3, 3, 3);

-- 输出 1, 2, 2, 2, 3, 3, 3

对于 fields.<field-name>.sequence-group,有效的比较数据类型包括:DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP 和 TIMESTAMP_LTZ。

4.3 部分列更新支持聚合

你可以为输入字段指定聚合函数,所有在 Paimon 聚合表功能 中支持的函数都可以使用。请参见示例:

CREATE TABLE t (
          k INT,
          a INT,
          b INT,
          c INT,
          d INT,
          PRIMARY KEY (k) NOT ENFORCED
) WITH (
     'merge-engine'='partial-update',
     'fields.a.sequence-group' = 'b',
     'fields.b.aggregate-function' = 'first_value',
     'fields.c.sequence-group' = 'd',
     'fields.d.aggregate-function' = 'sum'
 );
 
-- 输入 1,1,1,null,null --> b 字段 first_value 更新为 1
INSERT INTO t VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
-- 输入 1,null,null,1,1 --> d 字段聚合值为 1
INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
-- 输入 1,2,2,null,null --> b 字段 first_value 不更新
INSERT INTO t VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
-- 输入 1,null,null,2,2 --> d 字段聚合值为 3
INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);

-- 表最终结果输出 1, 2, 1, 2, 3

Flink Paimon 的 Partial Update 是一种高效的多流并发更新机制,适用于宽表构建、实时数据打宽、流批一体等场景。其核心优势在于避免了全量 Join 操作,减少了状态压力,同时通过序列组机制解决了多流更新的乱序问题。对于需要高效更新和打宽数据的业务场景,可以尝试使用 Partial Update 功能。另外,更多功能也请关注 Paimon 官方文档

5. 常见问题

5.1 部分列更新表不支持流读

问题现象:在流式任务消费上游的部分列更新表的时候报错如下,java.lang.RuntimeException: Partial update streaming reading is not supported. You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading. ('input' changelog producer is also supported, but only returns input records.)
Image

解决方案:部分列更新表如果没有产出 changelog,是无法进行流式读取的。建议使用 lookup或者full-compaction两种模式进行变更日志订阅。另外如果使用input的产出模式的话,一般是无法获得真正的变更日志,这种情况下获得的是上游的插入原始内容。