对于物化视图,建议将本地表作为源表和目标表,如此可使数据仅在本地进行处理,从而减少加工过程中节点间的数据交换。在此情况下,为了能够跨节点查询所有数据,在物化视图的目标表上也需建立分布式表。
基本操作流程:
以下为 SQL 语句样例:
--1. 源表的本地表表结构如下 Create Table default.ods_test_local on cluster bytehouse( id Int8, pDate Date, describtion String )Engine=HaMergeTree() Order By id; CREATE TABLE default.ods_test ON CLUSTER bytehouse AS default.ods_test_test_local ENGINE = Distributed('bytehouse',`default`,`ods_test_local`,rand()) --2. 新建目标表 CREATE TABLE default.dm_test_local ON CLUSTER bytehouse( id Int8, pDate Date ) ENGINE = HaMergeTree() ORDER BY id CREATE TABLE default.dm_test ON CLUSTER bytehouse AS default.dm_test_local ENGINE = Distributed('bytehouse',`default`,`dm_test_local`,rand()) --3. 新建物化视图 CREATE MATERIALIZED default.ods2dm_test_mv ON CLUSTER bytehouse TO default.dm_test_local AS SELECT id, pDate FROM ods_test_local; --4. 源表插入数据,测试目标表 INSERT INTO default.ods_test VALUES (1,'2024-1-1',''),(2,'2024-1-2','') SELECT * from default.dm_test LIMIT 10; --可以查询到数据 1,'2024-1-1' 2,'2024-1-2'
通过物化视图,能够将 HaKafka 表引擎自动同步至本地表,从而实现 Kafka 的导入。
基本架构图如下:
在管控界面中新建 Kafka 导入任务 ,即是以本示例为基础,自动生成并在界面上展示。
SQL 操作的示例如下:
-- 新建HaKafka表 CREATE TABLE TEST.CONSUMER_tce_service_resource_usage_local ON CLUSTER bytehouse ( `current_ts` Int64, `psm` String, `cpu_limit_pod` Float64 ) ENGINE = HaKafka() SETTINGS kafka_broker_list = 'xxx.xxx.xxx.xxx', -- 替换成kafka broker kafka_topic_list = 'xxx_topic', -- 替换成业务的topic kafka_group_name = 'xxx_group_name', -- 替换成业务相关的 group_name kafka_format = 'JSONEachRow', -- 一般用json kafka_row_delimiter = '\n', -- 一般是 \n kafka_leader_priority = '{replica_num}', -- 固定参数,勿修改 kafka_partition_num = '{shard_index}', -- 固定参数,勿修改 kafka_shard_count = '{shard_count}'; -- 固定参数,勿修改 --- 准备一张存储数据的HaMergeTree表 CREATE TABLE TEST.tce_service_resource_usage_local ON CLUSTER bytehouse ( `current_ts` Int64, `psm` String, `cpu_limit_pod` Float64 ) ENGINE = HaMergeTree() ORDER BY psm PARTITION BY toDate(current_ts) CREATE TABLE TEST.tce_service_resource_usage ON CLUSTER bytehouse AS TEST.tce_service_resource_usage_local ENGINE = Distributed('ByteHouse','TEST','tce_service_resource_usage_local'); --- 再建一张物化视图,表示把数据从HaKafka表中SELECT出来写入到HaMergeTree表 CREATE MATERIALIZED VIEW TEST.VIEW_tce_service_resource_usage_local ON CLUSTER ByteHouse TO TEST.tce_service_resource_usage_local AS SELECT current_ts, psm, cpu_limit_pod FROM TEST.CONSUMER_TEST_tce_service_resource_usage_local
之后只需要查询目标表的分布式表即可查看消费情况。
Select * from TEST.tce_service_resource_usage limit 10;
HaAggregatingMergeTree 和 HaSummingMergeTree 均为具有聚合功能的 MergeTree引擎类型。这两种类型能够对源表的数据进行聚合操作,随后将聚合后的数据同步至目标表,从而达成聚合查询加速的显著效果。
在实际应用中,通过利用它们的聚合特性,可以有效地提升数据查询的效率和性能。尤其是在处理大规模数据集合时,这种聚合功能可以显著减少查询所需的时间和资源消耗。
CREATE MATERIALIZED VIEW agg_test_mv ON CLUSTER bytehouse ENGINE = HaAggregatingMergeTree() ORDER BY (p_date, name) PARTITION BY p_date AS SELECT p_date, name, avgState(age) as age_avg FROM agg_test_source_local WHERE gender = 'male' GROUP BY p_date, name --新建物化视图后同步库表,可见到生成了一张名为 .inner_id.24c211a5-fec3-45a3-afed-e3d3cf46a1ea 的表为目标表;需要在目标表的基础上,再建 distributed 表; CREATE TABLE agg_test_target ON CLUSTER byteHouse as default.`.inner_id.24c211a5-fec3-45a3-afed-e3d3cf46a1ea` engine = Distributed('bytehouyse','default','.inner_id.24c211a5-fec3-45a3-afed-e3d3cf46a1ea',rand());
CREATE TABLE agg_test_target_local ON CLUSTER bytehouse p_date Date, name String, age_avg AggregateFunction(avg, Int64) -- ) engine = HaAggregatingMergeTree() ORDER BY (p_date, name) PARTITION BY p_date; CREATE TABLE agg_test_target ON CLUSTER bytehouse AS agg_test_target_local engine = Distributed('bytehouse','default','agg_test_target_local',rand()); CREATE MATERIALIZED VIEW agg_mv_test ON CLUSTER bytehouse to agg_test_target_local AS SELECT p_date, name, avgState(age) as age_avg FROM test WHERE gender = 'male' GROUP BY p_date, name;
注意
对于聚合函数,需要显示改写为 xxxState
,该函数用于保存中间状态,便于后续合并数据,如此处 avg()
改写为了 avgState()
。同时,查询该物化视图时,也需要显示改写为 xxxMerge()
,如:
SELECT name, avgMerge(age_avg) from agg_mv_test GROUP BY name;
物化视图不支持直接对列进行修改。在此情况下,建议按照以下步骤进行操作:
例如,现在若想给名为 “agg_mv_test” 的物化视图加上源表 “test” 中的 “gender” 和 “profile” 属性,并且再加上计算总年龄的聚合指标,则可按如下步骤进行操作:
DROP TABLE agg_mv_test on cluster bytehouse sync; CREATE MATERIALIZED VIEW agg_mv_test on cluster bytehouse ENGINE = HaAggregatingMergeTree() ORDER BY (p_date, name) PARTITION BY p_date AS SELECT p_date, name, gender, profile, avgState(age) as age_avg, sumState(age) as age_sum FROM test WHERE gender = 'male' GROUP BY p_date, name, gender, profile;
当用户直接查询物化视图的源表时,可以通过设置参数“enable_view_based_query_rewrite = 1
”来实现自动改写。如此设置后,查询会自动匹配对应的物化视图,从而实现自动加速。例如:
SELECT name, age from test WHERE p_date = '2019-01-01' AND gender = 'male' SETTINGS enable_view_based_query_rewrite = 1
如果目标表是 HaAggregatingMergeTree 类型,在查询时无需指定“xxxMerge”,直接使用标准聚合函数,系统也会自动进行改写。示例如下:
SELECT p_date, name, avg(age) as age_avg FROM test WHERE gender = 'male' GROUP BY p_date, name SETTINGS enable_view_based_query_rewrite = 1;