You need to enable JavaScript to run this app.
导航
Paimon 维表打宽功能
最近更新时间:2025.02.18 16:04:10首次发布时间:2025.02.18 16:04:10

1. 概述

1.1 维表查询

Flink SQL 任务中的实时维表打宽是一种非常常见的流式计算场景,通过实时数据流与外部维表进行实时关联,无需额外状态管理,语义简单且支持高 QPS 吞吐。常见的维表查询场景包含但不限于:

  1. 用户行为分析:用户行为日志关联用户的用户名、基础信息、历史行为等。
  2. 实时推荐系统:用户的实时操作与推荐模型中的特征数据进行关联,生成个性化推荐结果。
  3. 金融风控:交易数据与风险控制规则库进行关联,实时检测和预警异常交易行为。
  4. 物联网监控:设备数据与设备配置或状态信息进行关联,实时监控设备运行情况并及时发现故障。

1.2 Paimon 作为维表优势

维表打宽一般需要引入高 QPS 点查的远端存储(如 MySQL、HBase、Redis 等),增加了运维复杂性。在某些业务场景中,维表数据还需支持下游流批读写,可能额外引入 Dump Hive、Dump MQ 等链路,进一步扩展了其应用范围和复杂性。Paimon 作为基于 LSM-Tree 的存储系统,支持流批读写的同时,能非常好的支撑维表查询的场景。
Image
使用 Paimon 表直接作为维表的优势在于:

  • 性能良好:基于 LSM-Tree 的存储结构,提供高效的数据查找效率
  • 场景丰富:支持流批读写,简化维表维护链路
  • 成本较低:无需独占的计算资源,直接基于远端存储(HDFS、TOS)、本地文件、本地内存构建多级缓存,优化读取性能

2. 基础示例

以下示例主要使用 Flink SQL 的开发方式,因为篇幅原因本文不再赘述具体操作步骤,可以参考 Paimon 使用 Fileystem Catalog 开发

2.1 创建维表

在如下 SQL 中,我们构建了一张 dim_orders 表作为订单信息的维表。其中订单表的主键和分桶键都是 order_id ,即订单 id。我们后续在实时维表打宽作业的过程中会基于订单 id 进行关联,获取订单的关联信息。

CREATE CATALOG paimon_dim
WITH
  (
    'type' = 'paimon',
    'warehouse' = 'tos://flink-cwz-paimon/paimon_dim'
  );
  
CREATE DATABASE IF NOT EXISTS paimon_dim.dim_test;

-- 创建维表数据表
CREATE TABLE IF NOT EXISTS
  `paimon_dim`.`dim_test`.`dim_orders` (
    `order_id` INT,
    `order_name` STRING,
    `order_product_id` INT,
    `order_customer_id` INT,
    `order_status` STRING,
    `create_date` TIMESTAMP,
    `create_ts` INT,
    PRIMARY key (`order_id`) NOT ENFORCED
  )
WITH
  (
    'bucket' = '20',
    'bucket-key' = 'order_id',
    -- 为了控制数据量,可以将创建日期两天前数据进行过期
    'record-level.expire-time' = '2 d',
    'record-level.time-field' = 'create_ts'
  );

2.2 实时更新维表

当创建好维表之后,我们可以持续向维表中更新数据,代表正常的订单变更业务:

-- 使用 datagen 模拟数据写入维表
CREATE TABLE
  datagen_dim_source (
    `order_id` INT,
    `order_name` STRING,
    `order_product_id` INT,
    `order_customer_id` INT,
    `order_status` STRING
  )
WITH
  (
    'connector' = 'datagen'
  );

-- 生成数据实时写入维表
INSERT INTO
  `paimon_dim`.`dim_test`.`dim_orders`
SELECT
  order_id, order_name, order_product_id, order_customer_id, order_status, CURRENT_TIMESTAMP AS create_date, CAST(UNIX_TIMESTAMP () AS INT) AS create_ts
FROM
  datagen_dim_source;

2.3 模拟实时数据流

首先我们使用 datagen模拟实时数据流。注意这里采用 create_date作为处理时间。

-- 模拟数据流
CREATE TABLE
  datagen_source (
    `product_id` INT,
    `product_name` STRING,
    `product_category_id` INT,
    `product_order_id` INT,
    `product_status` STRING,
    `create_date` AS proctime ()
  )
WITH
  ('connector' = 'datagen');
  
-- 实时数据打宽后的下游宽表,测试中使用 print 打印到控制台
CREATE TABLE
  print_sink (
    `product_id` INT,
    `product_name` STRING,
    `product_category_id` INT,
    `product_order_id` INT,
    `product_status` STRING,
    `create_date` TIMESTAMP,
    `order_name` STRING,
    `orders_customer_id` INT,
    `order_status` STRING
  )
WITH
  ('connector' = 'print');

2.4 实时 lookup join

以下 SQL 语句定义了一个实时打宽的视图 trade_orde_view,通过将流式数据源 datagen_source 与 Paimon 维表 dim_orders 进行关联,实现数据的实时打宽。

-- 实时打宽后过滤 order_name 不为空的数据写入下游
INSERT INTO
  print_sink
SELECT
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` 
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id
WHERE
  orders.order_name IS NOT NULL;  

其中维表关联的具体逻辑解释如下:

  • 维表paimon_dim.dim_test.dim_orders 是上文中定义的 Paimon 维表。
  • 时间语义FOR SYSTEM_TIME AS OF gen.create_date 表示根据流数据中的 create_date 字段,查找维表在 create_date 时刻的快照数据。
  • 关联条件gen.product_order_id = orders.order_id 表示将流数据中的 product_order_id 与维表中的 order_id 进行关联。

3. 优化技巧

3.1 Lookup 延迟重试

如果 datagen_source 表(主表)的记录由于 orders 表(维表)的对应数据未准备好而无法完成 Join 操作,可以考虑使用 Flink 的 Lookup 延迟重试策略。该功能仅适用于 Flink-1.16-volcano 及以上版本。

-- 使用 hint 进行 Lookup 重试
SELECT /*+ LOOKUP('table'='orders', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` 
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id;

3.2 Lookup 异步延迟重试

同步重试的问题是,一条记录会阻塞后续记录,导致整个作业被阻塞。你可以考虑使用 异步(async) + 允许乱序(allow_unordered)来避免阻塞,这样 Join 缺失的记录将不再阻塞其他记录。

-- 使用 hint 进行 Lookup 重试
SELECT /*+ LOOKUP('table'='orders', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id;

3.3 动态分区

在传统数据仓库中,每个分区通常维护最新的全量数据,因此这种分区表只需要关联最新的分区。Paimon 专门为此场景开发了 max_pt 功能。

-- 如果 orders 表是每天一个快照的全量分区
CREATE TABLE IF NOT EXISTS
  `paimon_dim`.`dim_test`.`dim_orders` (
    `order_id` INT,
    `order_name` STRING,
    `order_product_id` INT,
    `order_customer_id` INT,
    `order_status` STRING,
    `create_date` TIMESTAMP,
    `create_ts` INT,
    `dt` STRING,
    PRIMARY key (`order_id`, `dt`) NOT ENFORCED
  ) PARTITIONED BY (dt);

在这种情况下,要使用 max_pt进行选择最新的分区进行维表关联:

-- 使用 hint 选择最新分区
SELECT 
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id;

3.5 缓存策略

在不同的 Look Join 场景中有部分缓存和全部缓存两种策略,可以提供开发者更好的性能调优选项。

3.5.1 部分缓存

部分缓存模式是在 Join 的过程中按需缓存命中的数据文件,如图所示:
Image
通过建表语句中 WITH 参数 'lookup.cache'='auto' 来开启,当满足以下两种情况:

  • 关联表为固定分桶模式的主键表
  • 关联表表的主键和 Join Key 一致

此时会自动选择部分缓存(Partial Cache)模式。而不满足这两个条件时会选择全部缓存(Full Cache)模式。

部分缓存能够利用 LSM-Tree 的主键有序性,实现维表缓存数据按需加载,避免全量数据加载,任务初始化更快。

注意:仅支持主键表的主键关联场景,如果关联 Key 不是主键,则无法使用。

3.5.1 全部缓存

全部缓存会批量将 Paimon 表数据全部 Load 到 RocksDB 中,这样能够在关联 Key 非主键的情况下,能够 Lookup 成功:
Image
通过参数 'lookup.cache'='full' 来开启
全部缓存模式,支持主键表的主键关联和非主键关联两种模式,也支持非主键表的关联。但是劣势是初始化加载时间较长,冷启动现象明显。

3.6 Bucket Shuffle

火山引擎的 Paimon 版本为社区提供了 Bucket Shuffle 功能,极大地提升了 Lookup Join 大规模维表时候的性能。Bucket Shuffle 的原理如下:

Image

Image

没有开启 Bucket Shuffle 功能
开启 Bucket Shuffle 功能

可以看出,在开启 Bucket Shuffle 的 Lookup Join 过程中,主数据会根据 Join Key 进行 Hash 分组处理,这样在每个分组中只要缓存对应 Bucket 数据,大大减少了内存用量,减少了缓存淘汰的概率。可以支持更大规模的维表。开启方法如下,在 hint 中设置 'shuffle' = 'true'

-- 使用 hint 选择最新分区
SELECT /*+ LOOKUP('table'='orders', 'shuffle'='true') */ 
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` 
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id;

适用场景:

  • 固定分桶表:主键表和 Append 表(非主键表)都支持
  • Join Key 要包含所有的 Bucket Key

4. 参数详解

以下是常见的维表的建表 WITH 参数设置:

参数

说明

数据类型

必填

默认值

备注

lookup.cache-max-memory-size

Paimon维表的内存缓存大小。

String

256 MB

该参数值会同时影响维表缓存大小和lookup changelog-producer的缓存大小,两个机制的缓存大小都由该参数配置。

lookup.cache

Paimon维表缓存模式

Enum

AUTO

参数支持 AUTO, FULL两个枚举值, 当满足条件时会优先使用Partial Cache模式

lookup.continuous.discovery-interval

维表数据刷新间隔

Duration

10s

指定维表增量数据刷新间隔

lookup.cache-max-disk-size

维表本地缓存最大空间占用

MemorySize

无限制

指定维表和Lookup Compaction本地磁盘空间占用上限, 目前仅适用于Partial Cache场景

lookup.local-file-type

维表文件类型

Enum

HASH

仅针对于Partial Cache场景, 支持 HASH 和 SORT 两种模式. Sort模式初始化更快