You need to enable JavaScript to run this app.
导航
物化视图最佳实践
最近更新时间:2024.09.20 18:02:26首次发布时间:2024.09.18 17:43:12

物化视图与分布式查询

对于物化视图,建议将本地表作为源表和目标表,如此可使数据仅在本地进行处理,从而减少加工过程中节点间的数据交换。在此情况下,为了能够跨节点查询所有数据,在物化视图的目标表上也需建立分布式表。
图片

基本操作流程:

  1. 新建源数据表,包括本地表和分布式表。
  2. 新建目标表,包括本地表和分布式表。
  3. 基于本地表新建物化视图。
  4. 源表(分布式表)插入数据。

以下为 SQL 语句样例:

--1. 源表的本地表表结构如下
Create Table default.ods_test_local on cluster bytehouse(
    id Int8,
    pDate Date,
    describtion String
)Engine=HaMergeTree()
Order By id;

CREATE TABLE default.ods_test ON CLUSTER bytehouse
   AS default.ods_test_test_local
   ENGINE = Distributed('bytehouse',`default`,`ods_test_local`,rand())

--2. 新建目标表
CREATE TABLE default.dm_test_local ON CLUSTER bytehouse(
    id Int8,
    pDate Date
) ENGINE = HaMergeTree()
ORDER BY id

CREATE TABLE default.dm_test ON CLUSTER bytehouse
   AS default.dm_test_local
   ENGINE = Distributed('bytehouse',`default`,`dm_test_local`,rand())

--3. 新建物化视图
CREATE MATERIALIZED default.ods2dm_test_mv ON CLUSTER bytehouse
TO default.dm_test_local
AS SELECT id, pDate FROM ods_test_local;

--4. 源表插入数据,测试目标表
INSERT INTO default.ods_test VALUES (1,'2024-1-1',''),(2,'2024-1-2','')
SELECT * from default.dm_test LIMIT 10;

--可以查询到数据
1,'2024-1-1'
2,'2024-1-2'

HaKafka 导入

通过物化视图,能够将 HaKafka 表引擎自动同步至本地表,从而实现 Kafka 的导入。
基本架构图如下:
图片
在管控界面中新建 Kafka 导入任务 ,即是以本示例为基础,自动生成并在界面上展示。
SQL 操作的示例如下:

-- 新建HaKafka表
CREATE TABLE TEST.CONSUMER_tce_service_resource_usage_local ON CLUSTER bytehouse (
    `current_ts` Int64,
    `psm` String,
    `cpu_limit_pod` Float64
) ENGINE = HaKafka()
SETTINGS 
kafka_broker_list = 'xxx.xxx.xxx.xxx',  -- 替换成kafka broker
kafka_topic_list = 'xxx_topic', -- 替换成业务的topic
kafka_group_name = 'xxx_group_name', -- 替换成业务相关的 group_name
kafka_format = 'JSONEachRow', -- 一般用json
kafka_row_delimiter = '\n', -- 一般是 \n
kafka_leader_priority = '{replica_num}', -- 固定参数,勿修改
kafka_partition_num = '{shard_index}', -- 固定参数,勿修改
kafka_shard_count = '{shard_count}'; -- 固定参数,勿修改

--- 准备一张存储数据的HaMergeTree表
CREATE TABLE TEST.tce_service_resource_usage_local ON CLUSTER bytehouse (
    `current_ts` Int64,
    `psm` String,
    `cpu_limit_pod` Float64
) ENGINE = HaMergeTree()
ORDER BY psm
PARTITION BY toDate(current_ts) 

CREATE TABLE TEST.tce_service_resource_usage ON CLUSTER bytehouse 
AS TEST.tce_service_resource_usage_local ENGINE = Distributed('ByteHouse','TEST','tce_service_resource_usage_local');

--- 再建一张物化视图,表示把数据从HaKafka表中SELECT出来写入到HaMergeTree表
CREATE MATERIALIZED VIEW TEST.VIEW_tce_service_resource_usage_local ON CLUSTER ByteHouse
TO TEST.tce_service_resource_usage_local
AS SELECT current_ts, psm, cpu_limit_pod FROM TEST.CONSUMER_TEST_tce_service_resource_usage_local

之后只需要查询目标表的分布式表即可查看消费情况。

Select * from TEST.tce_service_resource_usage limit 10;

聚合表引擎

HaAggregatingMergeTree 和 HaSummingMergeTree 均为具有聚合功能的 MergeTree引擎类型。这两种类型能够对源表的数据进行聚合操作,随后将聚合后的数据同步至目标表,从而达成聚合查询加速的显著效果。
在实际应用中,通过利用它们的聚合特性,可以有效地提升数据查询的效率和性能。尤其是在处理大规模数据集合时,这种聚合功能可以显著减少查询所需的时间和资源消耗。

方式1:自动创建目标表

CREATE MATERIALIZED VIEW agg_test_mv ON CLUSTER bytehouse 
    ENGINE = HaAggregatingMergeTree() 
    ORDER BY (p_date, name)
    PARTITION BY p_date 
    AS SELECT p_date, name, avgState(age) as age_avg
        FROM agg_test_source_local WHERE gender = 'male' GROUP BY p_date, name

--新建物化视图后同步库表,可见到生成了一张名为 .inner_id.24c211a5-fec3-45a3-afed-e3d3cf46a1ea 的表为目标表;需要在目标表的基础上,再建 distributed 表;
CREATE TABLE agg_test_target ON CLUSTER byteHouse
    as default.`.inner_id.24c211a5-fec3-45a3-afed-e3d3cf46a1ea`
    engine = Distributed('bytehouyse','default','.inner_id.24c211a5-fec3-45a3-afed-e3d3cf46a1ea',rand());

方式2:手动建目标

CREATE TABLE agg_test_target_local ON CLUSTER bytehouse 
    p_date Date,
    name String,
    age_avg AggregateFunction(avg, Int64) --
) engine = HaAggregatingMergeTree()
ORDER BY (p_date, name)
PARTITION BY p_date;

CREATE TABLE agg_test_target ON CLUSTER bytehouse 
    AS agg_test_target_local
    engine = Distributed('bytehouse','default','agg_test_target_local',rand());

CREATE MATERIALIZED VIEW agg_mv_test ON CLUSTER bytehouse 
    to agg_test_target_local
    AS SELECT p_date, name, avgState(age) as age_avg
        FROM test WHERE gender = 'male' GROUP BY p_date, name;

注意

对于聚合函数,需要显示改写为 xxxState,该函数用于保存中间状态,便于后续合并数据,如此处 avg() 改写为了 avgState()。同时,查询该物化视图时,也需要显示改写为 xxxMerge(),如:

SELECT name, avgMerge(age_avg) from agg_mv_test GROUP BY name;

更改物化视图结构

物化视图不支持直接对列进行修改。在此情况下,建议按照以下步骤进行操作:

  1. 删除物化视图。
  2. 若在手动指定了目标表的前提下,对目标表执行 Truncate 操作,并调整目标表的表结构。
  3. 重建物化视图。

例如,现在若想给名为 “agg_mv_test” 的物化视图加上源表 “test” 中的 “gender” 和 “profile” 属性,并且再加上计算总年龄的聚合指标,则可按如下步骤进行操作:

DROP TABLE agg_mv_test on cluster bytehouse sync;

CREATE MATERIALIZED VIEW agg_mv_test on cluster bytehouse 
    ENGINE = HaAggregatingMergeTree() 
    ORDER BY (p_date, name)
    PARTITION BY p_date 
    AS SELECT p_date, name, gender, profile, avgState(age) as age_avg, sumState(age) as age_sum FROM test WHERE gender = 'male' GROUP BY p_date, name, gender, profile;

自动改写

当用户直接查询物化视图的源表时,可以通过设置参数“enable_view_based_query_rewrite = 1”来实现自动改写。如此设置后,查询会自动匹配对应的物化视图,从而实现自动加速。例如:

SELECT name, age from test WHERE p_date = '2019-01-01' AND gender = 'male' SETTINGS enable_view_based_query_rewrite = 1

如果目标表是 HaAggregatingMergeTree 类型,在查询时无需指定“xxxMerge”,直接使用标准聚合函数,系统也会自动进行改写。示例如下:

SELECT p_date, name, avg(age) as age_avg FROM test WHERE gender = 'male' GROUP BY p_date, name SETTINGS enable_view_based_query_rewrite = 1;