Flink SQL 任务中的实时维表打宽是一种非常常见的流式计算场景,通过实时数据流与外部维表进行实时关联,无需额外状态管理,语义简单且支持高 QPS 吞吐。常见的维表查询场景包含但不限于:
维表打宽一般需要引入高 QPS 点查的远端存储(如 MySQL、HBase、Redis 等),增加了运维复杂性。在某些业务场景中,维表数据还需支持下游流批读写,可能额外引入 Dump Hive、Dump MQ 等链路,进一步扩展了其应用范围和复杂性。Paimon 作为基于 LSM-Tree 的存储系统,支持流批读写的同时,能非常好的支撑维表查询的场景。
使用 Paimon 表直接作为维表的优势在于:
以下示例主要使用 Flink SQL 的开发方式,因为篇幅原因本文不再赘述具体操作步骤,可以参考 Paimon 使用 Fileystem Catalog 开发。
在如下 SQL 中,我们构建了一张 dim_orders 表作为订单信息的维表。其中订单表的主键和分桶键都是 order_id ,即订单 id。我们后续在实时维表打宽作业的过程中会基于订单 id 进行关联,获取订单的关联信息。
CREATE CATALOG paimon_dim WITH ( 'type' = 'paimon', 'warehouse' = 'tos://flink-cwz-paimon/paimon_dim' ); CREATE DATABASE IF NOT EXISTS paimon_dim.dim_test; -- 创建维表数据表 CREATE TABLE IF NOT EXISTS `paimon_dim`.`dim_test`.`dim_orders` ( `order_id` INT, `order_name` STRING, `order_product_id` INT, `order_customer_id` INT, `order_status` STRING, `create_date` TIMESTAMP, `create_ts` INT, PRIMARY key (`order_id`) NOT ENFORCED ) WITH ( 'bucket' = '20', 'bucket-key' = 'order_id', -- 为了控制数据量,可以将创建日期两天前数据进行过期 'record-level.expire-time' = '2 d', 'record-level.time-field' = 'create_ts' );
当创建好维表之后,我们可以持续向维表中更新数据,代表正常的订单变更业务:
-- 使用 datagen 模拟数据写入维表 CREATE TABLE datagen_dim_source ( `order_id` INT, `order_name` STRING, `order_product_id` INT, `order_customer_id` INT, `order_status` STRING ) WITH ( 'connector' = 'datagen' ); -- 生成数据实时写入维表 INSERT INTO `paimon_dim`.`dim_test`.`dim_orders` SELECT order_id, order_name, order_product_id, order_customer_id, order_status, CURRENT_TIMESTAMP AS create_date, CAST(UNIX_TIMESTAMP () AS INT) AS create_ts FROM datagen_dim_source;
首先我们使用 datagen
模拟实时数据流。注意这里采用 create_date
作为处理时间。
-- 模拟数据流 CREATE TABLE datagen_source ( `product_id` INT, `product_name` STRING, `product_category_id` INT, `product_order_id` INT, `product_status` STRING, `create_date` AS proctime () ) WITH ('connector' = 'datagen'); -- 实时数据打宽后的下游宽表,测试中使用 print 打印到控制台 CREATE TABLE print_sink ( `product_id` INT, `product_name` STRING, `product_category_id` INT, `product_order_id` INT, `product_status` STRING, `create_date` TIMESTAMP, `order_name` STRING, `orders_customer_id` INT, `order_status` STRING ) WITH ('connector' = 'print');
以下 SQL 语句定义了一个实时打宽的视图 trade_orde_view
,通过将流式数据源 datagen_source
与 Paimon 维表 dim_orders
进行关联,实现数据的实时打宽。
-- 实时打宽后过滤 order_name 不为空的数据写入下游 INSERT INTO print_sink SELECT gen.product_id, gen.product_name, gen.product_category_id, gen.product_order_id, gen.product_status, gen.create_date, orders.order_name, orders.order_customer_id, order_status FROM datagen_source AS gen JOIN `paimon_dim`.`dim_test`.`dim_orders` FOR SYSTEM_TIME AS OF gen.create_date AS orders ON gen.product_order_id = orders.order_id WHERE orders.order_name IS NOT NULL;
其中维表关联的具体逻辑解释如下:
paimon_dim.dim_test.dim_orders
是上文中定义的 Paimon 维表。FOR SYSTEM_TIME AS OF gen.create_date
表示根据流数据中的 create_date
字段,查找维表在 create_date
时刻的快照数据。gen.product_order_id = orders.order_id
表示将流数据中的 product_order_id
与维表中的 order_id
进行关联。如果 datagen_source
表(主表)的记录由于 orders
表(维表)的对应数据未准备好而无法完成 Join 操作,可以考虑使用 Flink 的 Lookup 延迟重试策略。该功能仅适用于 Flink-1.16-volcano 及以上版本。
-- 使用 hint 进行 Lookup 重试 SELECT /*+ LOOKUP('table'='orders', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */ gen.product_id, gen.product_name, gen.product_category_id, gen.product_order_id, gen.product_status, gen.create_date, orders.order_name, orders.order_customer_id, order_status FROM datagen_source AS gen JOIN `paimon_dim`.`dim_test`.`dim_orders` FOR SYSTEM_TIME AS OF gen.create_date AS orders ON gen.product_order_id = orders.order_id;
同步重试的问题是,一条记录会阻塞后续记录,导致整个作业被阻塞。你可以考虑使用 异步(async) + 允许乱序(allow_unordered)来避免阻塞,这样 Join 缺失的记录将不再阻塞其他记录。
-- 使用 hint 进行 Lookup 重试 SELECT /*+ LOOKUP('table'='orders', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */ gen.product_id, gen.product_name, gen.product_category_id, gen.product_order_id, gen.product_status, gen.create_date, orders.order_name, orders.order_customer_id, order_status FROM datagen_source AS gen JOIN `paimon_dim`.`dim_test`.`dim_orders` /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */ FOR SYSTEM_TIME AS OF gen.create_date AS orders ON gen.product_order_id = orders.order_id;
在传统数据仓库中,每个分区通常维护最新的全量数据,因此这种分区表只需要关联最新的分区。Paimon 专门为此场景开发了 max_pt
功能。
-- 如果 orders 表是每天一个快照的全量分区 CREATE TABLE IF NOT EXISTS `paimon_dim`.`dim_test`.`dim_orders` ( `order_id` INT, `order_name` STRING, `order_product_id` INT, `order_customer_id` INT, `order_status` STRING, `create_date` TIMESTAMP, `create_ts` INT, `dt` STRING, PRIMARY key (`order_id`, `dt`) NOT ENFORCED ) PARTITIONED BY (dt);
在这种情况下,要使用 max_pt
进行选择最新的分区进行维表关联:
-- 使用 hint 选择最新分区 SELECT gen.product_id, gen.product_name, gen.product_category_id, gen.product_order_id, gen.product_status, gen.create_date, orders.order_name, orders.order_customer_id, order_status FROM datagen_source AS gen JOIN `paimon_dim`.`dim_test`.`dim_orders` /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */ FOR SYSTEM_TIME AS OF gen.create_date AS orders ON gen.product_order_id = orders.order_id;
在不同的 Look Join 场景中有部分缓存和全部缓存两种策略,可以提供开发者更好的性能调优选项。
部分缓存模式是在 Join 的过程中按需缓存命中的数据文件,如图所示:
通过建表语句中 WITH 参数 'lookup.cache'='auto'
来开启,当满足以下两种情况:
此时会自动选择部分缓存(Partial Cache)模式。而不满足这两个条件时会选择全部缓存(Full Cache)模式。
部分缓存能够利用 LSM-Tree 的主键有序性,实现维表缓存数据按需加载,避免全量数据加载,任务初始化更快。
注意:仅支持主键表的主键关联场景,如果关联 Key 不是主键,则无法使用。
全部缓存会批量将 Paimon 表数据全部 Load 到 RocksDB 中,这样能够在关联 Key 非主键的情况下,能够 Lookup 成功:
通过参数 'lookup.cache'='full'
来开启
全部缓存模式,支持主键表的主键关联和非主键关联两种模式,也支持非主键表的关联。但是劣势是初始化加载时间较长,冷启动现象明显。
火山引擎的 Paimon 版本为社区提供了 Bucket Shuffle 功能,极大地提升了 Lookup Join 大规模维表时候的性能。Bucket Shuffle 的原理如下:
可以看出,在开启 Bucket Shuffle 的 Lookup Join 过程中,主数据会根据 Join Key 进行 Hash 分组处理,这样在每个分组中只要缓存对应 Bucket 数据,大大减少了内存用量,减少了缓存淘汰的概率。可以支持更大规模的维表。开启方法如下,在 hint 中设置 'shuffle' = 'true'
:
-- 使用 hint 选择最新分区 SELECT /*+ LOOKUP('table'='orders', 'shuffle'='true') */ gen.product_id, gen.product_name, gen.product_category_id, gen.product_order_id, gen.product_status, gen.create_date, orders.order_name, orders.order_customer_id, order_status FROM datagen_source AS gen JOIN `paimon_dim`.`dim_test`.`dim_orders` FOR SYSTEM_TIME AS OF gen.create_date AS orders ON gen.product_order_id = orders.order_id;
适用场景:
以下是常见的维表的建表 WITH 参数设置:
参数 | 说明 | 数据类型 | 必填 | 默认值 | 备注 |
---|---|---|---|---|---|
lookup.cache-max-memory-size | Paimon维表的内存缓存大小。 | String | 否 | 256 MB | 该参数值会同时影响维表缓存大小和lookup changelog-producer的缓存大小,两个机制的缓存大小都由该参数配置。 |
lookup.cache | Paimon维表缓存模式 | Enum | 否 | AUTO | 参数支持 AUTO, FULL两个枚举值, 当满足条件时会优先使用Partial Cache模式 |
lookup.continuous.discovery-interval | 维表数据刷新间隔 | Duration | 否 | 10s | 指定维表增量数据刷新间隔 |
lookup.cache-max-disk-size | 维表本地缓存最大空间占用 | MemorySize | 否 | 无限制 | 指定维表和Lookup Compaction本地磁盘空间占用上限, 目前仅适用于Partial Cache场景 |
lookup.local-file-type | 维表文件类型 | Enum | 否 | HASH | 仅针对于Partial Cache场景, 支持 HASH 和 SORT 两种模式. Sort模式初始化更快 |