You need to enable JavaScript to run this app.
导航
异步物化视图
最近更新时间:2024.09.02 15:52:09首次发布时间:2024.09.02 15:52:09

基本概念

  • 基表(Base Table)- 物化视图的驱动表, 异步物化视图支持join,subquery多表的关联查询。
  • 目的表(Target Table)- 物化视图实际存储的表,一般是AggregateMergeTree 或者MergeTree
  • 物化视图 (Materialized View) - 逻辑概念定义基表到目的表的映射关系
  • 刷新 (Refresh) - 创建物化视图后,其中的数据仅反映创建时刻基表的状态。当基表中的数据发生变化时,需要通过刷新通过视图从基表执行查询映射到目的表。
  • 查询改写(Query Rewrite)- 查询改写是指在对已构建了物化视图的基表进行查询时,系统自动判断是否可以复用物化视图中的预计算结果处理查询。如果可以复用,系统会直接从相关的物化视图读取预计算结果,以避免重复计算消耗系统资源和时间。

使用场景

目前开源ClickHouse的物化视图是同步视图,对于使用场景有如下限制:

  • 影响基表数据导入性能。
  • 支持单表,无法支持多表关联。

为了更好的支持多表关联的场景,减少对于数据导入影响,一个比较好的解决方法是使用异步视图,其用途主要包括:

  • 复杂查询加速: 异步物化视图存储了基于基表特定查询语句的预计算结果。对基表执行复杂查询时,可以直接复用预计算结果,避免重复计算,进而提高查询性能。查询的频率越高或查询语句越复杂,性能增益就会越很明显。
  • 数据加工:通过异步物化视图可以对底层数据进行ELT做轻量级的数据加工,直接在ClickHouse引擎内容实现数据的转化,减少在不同的数据处理系统切换,降低使用门槛和维护成本。

视图类型

单表聚合

多表关联

查询改写

刷新策略

数据导入影响

异步物化视图

异步刷新
手动刷新

同步物化视图

导入同步刷新

异步物化视图优势

  • 由于定期异步刷新,不会影响基表的数据写入链路,提高基表写入性能。
  • 视图如果是分区对齐的方式,会探测基表数据变化,按照分区进行刷新,减少刷新代价。
  • 基表和目的表分区可以进行表达式转化,不要求必须一致。
  • 支持复杂多表join查询,hive外表查询,unique table,物化视图级联,普通视图等多种物化视图。
  • 查询结果强一致性,原始查询与改写后的物化表查询结果保证一致。
  • 对于特定算子(sum, min, max...)存放值而不是二进制中间状态数据,避免二次merge造成的性能开销。
  • 异步视图的目的表为MergeTree类型而非AggregatingMergeTree, 使用灵活性更高。
  • 完善的任务执行状态监控。

基本语法

--建异步视图
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [database.]<mv_name> to [database.]<target_name> [REFRESH [ASYNC [START (start_time)] EVERY (INTERVAL refresh_interval) |  MANUAL]] AS <query_statement> properties <mv_properties>;

--系统命令,启动,停止视图刷新,激活,失效视图
SYSTEM [START VIEW | STOP VIEW | ACTIVATE VIEW | DEACTIVATE VIEW] [database.]<mv_name>;

--手动刷新视图
REFRESH MATERIALIZED VIEW [database.]<mv_name>;

--修改视图属性
ALTER TABLE [database.]<mv_name> MODIFY PROPERTY ...;

注意事项

  • refresh_interval只接受间隔n天/小时/分钟/秒。
  • start_time是可选的,必须是YYYY-mm-dd HH:MM:SS格式。
  • REFRESH 支持ASYNC (自动刷新),MANUAL(手动刷新)。
  • ASYNC模式下基表类型只允许 distributed table,hive cluster table, materiaizlized view,必须到目的表,带to关键字。
  • SYSTEM START VIEW | STOP VIEW 可以在集群任何节点执行。
  • SYSTEM ACTIVATE VIEW | DEACTIVATE VIEW 需要在集群所有节点执行,使视图失效会停止刷新,停止查询改写。
  • MODIFY PROPERTY 需要在各个节点执行,active,base_tables不允许修改,修改会有报错提醒。

物化视图属性

  • base_tables - 视图依赖的基表列表,格式:'db.table_1, db_table_2,...' (创建表时自动添加)。
  • active - 是否是有效的状态,false-不参与改写,不刷新,true-参与改写,刷新(创建表时自动添加)。
  • async_mv_enable_trigger_all_mode - 定义视图刷新的触发模式。
    • false ( 弱依赖,有一个基表更新就会触发物化视图更新)。
    • true (强依赖,基表全部更新才触发物化视图更新)。
  • async_mv_excluded_trigger_tables - 可选依赖,配置基表 更新不会触发物化视图更新,格式:'db.table_1, db_table_2,...'。
  • async_mv_refresh_execution_time : 刷新任务最大执行时间。
  • async_mv_partition_refresh_max_limit: 限定探测原表变化最大分区范围数量,分区字段允许如下方式, (date), ('date'), (date, hour), ('date', hour), ('date', 'hour'), eg. 原表分区date, async_mv_partition_refresh_max_limit = 10, 会在分区映射计算构造分区过滤条件: p_date >= toDate(now()) - 10。
  • async_mv_partition_refresh_min_limit: 限定探测原表变化最小分区范围数量,分区字段允许如下方式, (date), ('date'), (date, hour), ('date', hour), ('date', 'hour'), eg. 原表分区date, async_mv_partition_refresh_min_limit = 1, 会在分区映射计算构造分区过滤条件: p_date <= toDate(now()) - 1, async_mv_partition_refresh_max_limit >= async_mv_partition_refresh_min_limit 两个参数相当于构造了探测. 基表变化的范围,toDate(now()) - async_mv_partition_refresh_max_limit <= p_date >= toDate(now()) - async_mv_partition_refresh_min_limit, 对于实时场景,可以设置async_mv_partition_refresh_min_limit=1,今天的数据变化就不会被感知,频繁触发刷新操作。
  • async_mv_enbale_query_rewrite - 是否开启视图改写 。
  • async_mv_refresh_compute_resource: 配置refresh task计算资源集群名称,用于refresh的查询计算。
  • async_mv_enable_refresh_execute_serial : 确定执行刷新任务是否是串行执行方式还是并发执行方式,默认值-false。
  • async_mv_max_refresh_task_num: 视图刷新任务一次提交任务的最大并行度,默认值-16。
  • async_mv_enable_split_full_refresh: 是否把full refresh的任务拆分成按照partition based的任务,默认是true,默认打开有助于避免大的刷新任务。

异步视图使用实例

两表join异步物化视图的建表语句。

  • 维度表business.goods与事实表business.order_list进行join,聚合计算。
  • 物化表分区进行天到月分区粒度上卷 date_trunc('month', toDate(order.order_date)) as date。
  • 导入数据自动刷新。
  • 原始查询会自动改写命中视图。
---创建数据库
DROP DATABASE IF EXISTS business on cluster test_shard_localhost sync;
CREATE DATABASE IF NOT EXISTS business on cluster test_shard_localhost;

---维度表local表
DROP TABLE IF EXISTS business.goods_local on cluster test_shard_localhost sync;
CREATE TABLE business.goods_local on cluster test_shard_localhost(
    create_date       DateTime,
    series_id         UInt64,
    goods_id          Int64,
    item_name         String,
    price             Float64
) ENGINE = HaMergeTree('/clickhouse/test_shard_localhost/business.goods_local/{shard}', '{replica}') PARTITION BY toDate(create_date) ORDER BY (goods_id, item_name) SETTINGS index_granularity = 8192;

---维度表分布式表
DROP TABLE IF EXISTS business.goods on cluster test_shard_localhost sync;
CREATE TABLE business.goods on cluster test_shard_localhost (
    create_date       DateTime,
    series_id         UInt64,
    goods_id          Int64,
    item_name         String,
    price             Float64
) ENGINE = Distributed('test_shard_localhost', 'business', 'goods_local', intHash64(goods_id));

---事实表local表
DROP TABLE IF EXISTS business.order_list_local on cluster test_shard_localhost sync;
CREATE TABLE business.order_list_local on cluster test_shard_localhost(
    order_id          Int64,
    client_id         Int64,
    goods_id          Int64,
    shop_id           UInt64,
    order_date        DateTime
) ENGINE = HaMergeTree('/clickhouse/test_shard_localhost/business.order_list_local/{shard}', '{replica}') PARTITION BY (toDate(order_date), shop_id) ORDER BY (order_id,goods_id, client_id) SETTINGS index_granularity = 8192;

---事实表分布式表
DROP TABLE IF EXISTS business.order_list on cluster test_shard_localhost sync;
CREATE TABLE business.order_list on cluster test_shard_localhost(
    order_id          Int64,
    client_id         Int64,
    goods_id          Int64,
    shop_id           UInt64,
    order_date        DateTime
) ENGINE = Distributed('test_shard_localhost', 'business', 'order_list_local', intHash64(order_id)); 

--目的表local表
DROP TABLE IF EXISTS business.order_statistics_inner_join_local on cluster test_shard_localhost sync;
CREATE TABLE business.order_statistics_inner_join_local on cluster test_shard_localhost (date Date, encoded_shop_id UInt64, encoded_goods_series_id UInt64, item_id Int64, total_price Float64, max_price Float64) ENGINE = HaMergeTree('/clickhouse/test_shard_localhost/business.order_statistics_inner_join_local/{shard}', '{replica}') PARTITION BY (date, toUInt64(encoded_shop_id + 10)) ORDER BY (date, encoded_shop_id, encoded_goods_series_id, item_id) settings index_granularity = 8192;

--目的表分布式表
DROP TABLE IF EXISTS business.order_statistics_inner_join on cluster test_shard_localhost sync;
CREATE TABLE business.order_statistics_inner_join on cluster test_shard_localhost (date Date, encoded_shop_id UInt64, encoded_goods_series_id UInt64, item_id Int64, total_price Float64, max_price Float64) ENGINE = Distributed('test_shard_localhost', 'business', 'order_statistics_inner_join_local', intHash64(item_id));


--创建异步物化视图
DROP TABLE IF EXISTS business.order_mv_inner_join on cluster test_shard_localhost sync;
CREATE MATERIALIZED VIEW business.order_mv_inner_join on cluster test_shard_localhost TO business.order_statistics_inner_join
REFRESH ASYNC START('2023-12-05 10:00:00') EVERY(INTERVAL 5 SECOND)
AS SELECT
    date_trunc('month', toDate(order.order_date)) as date,
    multiIf(order.shop_id >= 2702, toUInt64(30000), shop_id <= 2701, toUInt64(40000), toUInt64(50000)) as encoded_shop_id,
    multiIf(good.series_id = 1, toUInt64(100), good.series_id = 2, toUInt64(101), good.series_id = 3, toUInt64(103), toUInt64(10)) as encoded_goods_series_id, 
    order.order_id as item_id,
    sum(good.price) as total_price, 
    max(good.price) as max_price
FROM business.order_list as order inner JOIN business.goods as good ON good.goods_id = order.goods_id WHERE date > '2023-04-01' and date <= '2023-12-01' and order.shop_id >= 2702 GROUP BY date, encoded_shop_id, encoded_goods_series_id, item_id;

--导入数据
INSERT INTO business.goods
VALUES ('2023-11-01 08:32:01', 1, 1001,'apple',2366.5), ('2023-11-02 09:13:15', 1, 1002,'pear',829.0),('2023-11-03 15:06:45', 2, 1003,'potato',2238.2), ('2023-11-03 15:23:45', 3, 1004,'waterlemon',232.2),('2023-12-03 16:13:40', 4, 1108,'keybroad',2345.2);

INSERT INTO business.order_list
VALUES
    (10001,201,1001, 2701, '2023-02-16 13:30:01'),
    (10001,301,1002, 2702, '2023-12-17 20:16:15'),
    (10002,233,1002, 2703, '2023-06-20 09:42:02'),
    (10003,453,1003, 2701, '2023-05-23 16:23:34'),
    (10004,201,2001, 2701, '2023-02-16 13:30:01'),
    (10005,201,2001, 2701, '2023-02-16 13:30:01');

--命中视图
SELECT
    date_trunc('month', toDate(order.order_date)) AS date,
    multiIf(order.shop_id >= 2702, toUInt64(30000), shop_id <= 2701, toUInt64(40000), toUInt64(50000)) AS encoded_shop_id,
    multiIf(good.series_id = 1, toUInt64(100), good.series_id = 2, toUInt64(101), good.series_id = 3, toUInt64(103), toUInt64(10)) AS encoded_goods_series_id,
    order.order_id AS item_id,
    sum(good.price) AS total_price,
    max(good.price) AS max_price
FROM business.order_list AS order
INNER JOIN business.goods AS good ON good.goods_id = order.goods_id
WHERE (date > '2023-04-01') AND (date <= '2023-12-01') AND (order.shop_id >= 2702)
GROUP BY
    date,
    encoded_shop_id,
    encoded_goods_series_id,
    item_id ORDER BY date settings enable_optimizer=1;