You need to enable JavaScript to run this app.
导航
ByteHouse 维表打宽功能
最近更新时间:2025.03.28 15:32:05首次发布时间:2025.03.28 15:32:05
我的收藏
有用
有用
无用
无用

1. 概述

1.1 维表查询

Flink SQL 任务中的实时维表打宽是一种非常常见的流式计算场景,通过实时数据流与外部维表进行实时关联,无需额外状态管理,语义简单且支持高 QPS 吞吐。常见的维表查询场景包含但不限于:

  1. 用户行为分析:用户行为日志关联用户的用户名、基础信息、历史行为等。
  2. 实时推荐系统:用户的实时操作与推荐模型中的特征数据进行关联,生成个性化推荐结果。
  3. 金融风控:交易数据与风险控制规则库进行关联,实时检测和预警异常交易行为。
  4. 物联网监控:设备数据与设备配置或状态信息进行关联,实时监控设备运行情况并及时发现故障。

1.2 ByteHouse-CDW 作为维表优势

维表打宽一般需要引入高 QPS 点查的远端存储。在某些业务场景中,维表数据还需支持下游 OLAP 分析需求。ByteHouse 作为基于高性能 OLAP 查询引擎,能在提供下游业务查询的同时,提供非常好的支撑秒级延迟的维表查询的场景。

Image

使用 ByteHouse-CDW 表直接作为维表的优势在于:

  • 性能良好:可以支撑高 QPS 的点查能力,可以支持秒级实时性维表服务
  • 场景丰富:支持业务查询和点查维表能力,简化维表维护链路
  • 成本较低:ByteHouse-CDW (云数仓版)支持存算分离,具有超高的弹性和较低存储成本。

2. 基础示例

注意:使用此项功能,请确保 Flink 使用 1.16 以上版本,并且 ByteHouse-CDW Connector 在1.27.109 以上版本。

2.1 创建维表

在如下 SQL 中,我们构建了一张 dim_orders 表作为订单信息的维表。其中订单表的主键和分桶键都是 order_id ,即订单 id。我们后续在实时维表打宽作业的过程中会基于订单 id 进行关联,获取订单的关联信息。

-- 创建维表数据表
CREATE TABLE IF NOT EXISTS
  `dim_orders` (
    `order_id` INT,
    `order_name` STRING,
    `order_product_id` INT,
    `create_date` TIMESTAMP,
    PRIMARY key (`order_id`) NOT ENFORCED
  )
WITH
  (
    'connector' = 'bytehouse-cdw',
    'jdbc.enable-gateway-connection' = 'true',
    -- ByteHouse-CDW 连接信息
    'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
    'bytehouse.gateway.host' = '${secret_values.bh-host}',
    'bytehouse.gateway.port' = '19000',
    'bytehouse.gateway.api-token' = '${secret_values.bh-api-key}',
    'bytehouse.gateway.virtual-warehouse' = '${secret_values.bh-vw}',
    -- 维表查询参数
    'lookup.async.scale-factor' = '16', -- SYNC mode
    'lookup.cache.max-rows' = '10000', -- set '0' to disable caching
    'lookup.cache.ttl' = '1 hour',
    -- ByteHouse 数据库表
    'database' = 'bh_db',
    'table-name' = 'dim_orders'
  );

2.2 模拟实时数据流

首先我们使用 datagen模拟实时数据流。注意这里采用 create_date作为处理时间。

-- 模拟数据流
CREATE TABLE
  datagen_source (
    `product_id` INT,
    `product_name` STRING,
    `product_order_id` INT,
    `create_date` AS proctime ()
  )
WITH
  ('connector' = 'datagen');
  
-- 实时数据打宽后的下游宽表,测试中使用 print 打印到控制台
CREATE TABLE
  print_sink (
    `product_id` INT,
    `product_name` STRING,
    `product_order_id` INT,
    `create_date` TIMESTAMP,
    `order_name` STRING
  )
WITH
  ('connector' = 'print');

2.4 实时 lookup join

以下 SQL 语句定义了一个实时打宽的视图 trade_orde_view,通过将流式数据源 datagen_source 与 ByteHouse-CDW 维表 dim_orders 进行关联,实现数据的实时打宽。

-- 实时打宽后过滤 order_name 不为空的数据写入下游
INSERT INTO
  print_sink
SELECT
  gen.product_id, gen.product_name, 
  gen.product_order_id, gen.create_date,
  orders.order_name
FROM
  datagen_source AS gen
  JOIN `dim_orders` 
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id
WHERE
  orders.order_name IS NOT NULL;  

其中维表关联的具体逻辑解释如下:

  • 维表dim_orders 是上文中定义的 ByteHouse-CDW 维表。
  • 时间语义FOR SYSTEM_TIME AS OF gen.create_date 表示根据流数据中的 create_date 字段,查找维表在 create_date 时刻的快照数据。
  • 关联条件gen.product_order_id = orders.order_id 表示将流数据中的 product_order_id 与维表中的 order_id 进行关联。

3. 调优技巧

ByteHouse CDW维表lookup支持同步模式和异步模式,并且支持多种缓存策略(全量缓存、部分缓存、无缓存)。实践上的选型主要需要在维表实时性查询延迟/吞吐之间进行权衡,根据业务特点选择合适的维表关联方式。

  • **异步lookup:**设置 lookup.async.scale-factor 大于 1。
    • 适用于维表实时性高(维表更新实时可见)、查询吞吐中等(~2k QPS)、大维表(2千万行以上)的场景。 数据新鲜度高,维表join性能高。
  • **全量缓存lookup:**设置lookup.cache.max-rows为 -1。
    • 适用于维表实时性中等(定时同步维表更新,如每分钟)、查询吞吐高(~5万QPS)、小维表(50万行以下)的场景。 数据新鲜度低,维表join性能高。
  • **部分缓存lookup:**设置 **** lookup.cache.max-rows 大于 0。
    • 适用于维表实时性较高(缓存驱逐策略可自定义,如每30秒或1千行LRU)、查询吞吐高(~2万QPS)、维表查询服从正态分布(比如90%查询命中10%数据)的场景。 数据新鲜度中,维表join性能中。
  • 实时 lookup:设置 **** lookup.cache.max-rows 等于 0。
    • 适用于维表实时性极其严苛(维表更新立刻可见)、查询吞吐较小(小于 1k QPS)、小维表的场景。 数据新鲜度高。

4. 参数详解

以下是常见的维表的建表 WITH 参数设置:

参数

必选

默认值

值类型

说明

lookup.async.scale-factor

1

Integer

如果等于1则为同步lookup;
如果大于1则为异步lookup,且此值是单个subtask的lookup并发数。

lookup.cache.max-rows

0

Integer

如果大于0,表示维表cache的最大行数,若超过该值,则最老的行记录将会过期;

如果设置为0,cache则被禁用,所有的查询将运行在Bytehouse Server,最大程度保证维表数据新鲜度;
如果设置-1,整个表的数据都被cache到内存,最大程度保证性能,但是需要注意内存使用过多问题。

lookup.cache.ttl

none

Duration

在记录写入缓存后该记录的最大保留时间。

lookup.max-retries

3

Integer

查询数据库失败的最大重试次数。

lookup.cache.skip-missing-key

false

Boolean

是否跳过维表中是空值的key,最大程度去join上非空值。