Flink SQL 任务中的实时维表打宽是一种非常常见的流式计算场景,通过实时数据流与外部维表进行实时关联,无需额外状态管理,语义简单且支持高 QPS 吞吐。常见的维表查询场景包含但不限于:
维表打宽一般需要引入高 QPS 点查的远端存储。在某些业务场景中,维表数据还需支持下游 OLAP 分析需求。ByteHouse 作为基于高性能 OLAP 查询引擎,能在提供下游业务查询的同时,提供非常好的支撑秒级延迟的维表查询的场景。
使用 ByteHouse-CDW 表直接作为维表的优势在于:
注意:使用此项功能,请确保 Flink 使用 1.16 以上版本,并且 ByteHouse-CDW Connector 在1.27.109 以上版本。
在如下 SQL 中,我们构建了一张 dim_orders 表作为订单信息的维表。其中订单表的主键和分桶键都是 order_id ,即订单 id。我们后续在实时维表打宽作业的过程中会基于订单 id 进行关联,获取订单的关联信息。
-- 创建维表数据表 CREATE TABLE IF NOT EXISTS `dim_orders` ( `order_id` INT, `order_name` STRING, `order_product_id` INT, `create_date` TIMESTAMP, PRIMARY key (`order_id`) NOT ENFORCED ) WITH ( 'connector' = 'bytehouse-cdw', 'jdbc.enable-gateway-connection' = 'true', -- ByteHouse-CDW 连接信息 'bytehouse.gateway.region' = 'VOLCANO_PRIVATE', 'bytehouse.gateway.host' = '${secret_values.bh-host}', 'bytehouse.gateway.port' = '19000', 'bytehouse.gateway.api-token' = '${secret_values.bh-api-key}', 'bytehouse.gateway.virtual-warehouse' = '${secret_values.bh-vw}', -- 维表查询参数 'lookup.async.scale-factor' = '16', -- SYNC mode 'lookup.cache.max-rows' = '10000', -- set '0' to disable caching 'lookup.cache.ttl' = '1 hour', -- ByteHouse 数据库表 'database' = 'bh_db', 'table-name' = 'dim_orders' );
首先我们使用 datagen
模拟实时数据流。注意这里采用 create_date
作为处理时间。
-- 模拟数据流 CREATE TABLE datagen_source ( `product_id` INT, `product_name` STRING, `product_order_id` INT, `create_date` AS proctime () ) WITH ('connector' = 'datagen'); -- 实时数据打宽后的下游宽表,测试中使用 print 打印到控制台 CREATE TABLE print_sink ( `product_id` INT, `product_name` STRING, `product_order_id` INT, `create_date` TIMESTAMP, `order_name` STRING ) WITH ('connector' = 'print');
以下 SQL 语句定义了一个实时打宽的视图 trade_orde_view
,通过将流式数据源 datagen_source
与 ByteHouse-CDW 维表 dim_orders
进行关联,实现数据的实时打宽。
-- 实时打宽后过滤 order_name 不为空的数据写入下游 INSERT INTO print_sink SELECT gen.product_id, gen.product_name, gen.product_order_id, gen.create_date, orders.order_name FROM datagen_source AS gen JOIN `dim_orders` FOR SYSTEM_TIME AS OF gen.create_date AS orders ON gen.product_order_id = orders.order_id WHERE orders.order_name IS NOT NULL;
其中维表关联的具体逻辑解释如下:
dim_orders
是上文中定义的 ByteHouse-CDW 维表。FOR SYSTEM_TIME AS OF gen.create_date
表示根据流数据中的 create_date
字段,查找维表在 create_date
时刻的快照数据。gen.product_order_id = orders.order_id
表示将流数据中的 product_order_id
与维表中的 order_id
进行关联。ByteHouse CDW维表lookup支持同步模式和异步模式,并且支持多种缓存策略(全量缓存、部分缓存、无缓存)。实践上的选型主要需要在维表实时性和查询延迟/吞吐之间进行权衡,根据业务特点选择合适的维表关联方式。
lookup.async.scale-factor
大于 1。
lookup.cache.max-rows
为 -1。
lookup.cache.max-rows
大于 0。
lookup.cache.max-rows
等于 0。
以下是常见的维表的建表 WITH 参数设置:
参数 | 必选 | 默认值 | 值类型 | 说明 |
---|---|---|---|---|
| 否 |
| Integer | 如果等于1则为同步lookup; |
| 否 |
| Integer | 如果大于0,表示维表cache的最大行数,若超过该值,则最老的行记录将会过期; 如果设置为0,cache则被禁用,所有的查询将运行在Bytehouse Server,最大程度保证维表数据新鲜度; |
| 否 |
| Duration | 在记录写入缓存后该记录的最大保留时间。 |
| 否 |
| Integer | 查询数据库失败的最大重试次数。 |
| 否 |
| Boolean | 是否跳过维表中是空值的key,最大程度去join上非空值。 |