You need to enable JavaScript to run this app.
导航
参考:物化视图相关系统表
最近更新时间:2024.09.02 15:52:09首次发布时间:2024.09.02 15:52:09

相关系统表

视图状态表 - system.view_tables

定义

CREATE TABLE system.view_tables
(
    `database` String,
    `table` String,
    `uuid` UUID,
    `definition` String,
    `base_table_databases` Array(String),
    `base_table_tables` Array(String),
    `dependent_table_databases` Array(String),
    `dependent_table_tables` Array(String),
    `target_table_database` String,
    `target_table_table` String,
    `partition_diffs` Array(String),
    `latest_visible_partitions` Array(String),
    `previous_partitions` Array(String),
    `refresh_type` String,
    `refresh_start_time` String,
    `refresh_interval` String,
    `active` UInt8,
    `coordinator_address` String,
    `refresh_status` String
)
ENGINE = SystemViewTables

字段

  • database, table, uuid, definition - 视图的数据库,表名,视图创建语句。
  • base_table_databases , base_table_tables - 视图依赖的基表。
  • dependent_table_databases, dependent_table_tables - 视图刷新分区对齐的基表列表。
  • target_table_database, target_table_table - 目的表库表名。
  • partition_diffs - 当前视图刷新本次与上一次的分区不同列表,add_parittions代表当前新增分区,drop_partitions代表当前删除的分区,刷新会根据diff列表构造不同的刷新任务。
  • latest_visible_partitions - 当前视图的partition信息。
  • previous_partitions - 上次刷新存在mysql中的partition信息。
  • refresh_type, refresh_start_time, refresh_interval - 刷新的类型,启动时间,刷新间隔。
  • active - 当前是否是有效的,如果遇到修改基表涉及到视图表,会设置为视图无效状态,不能改写,刷新,并且试图无效状态不可修改,除非删除重建。
  • coordinator_address - 刷新任务的coodrdinator的地址信息。
  • refresh_status - 当前刷新任务是否正在运行,运行的任务数,running, running_tasks。

示例

database:                  business_v2
table:                     goods_mv
uuid:                      6d96110d-931c-4436-b5fa-ee69c981a4f9
definition:                CREATE MATERIALIZED VIEW business_v2.goods_mv UUID '6d96110d-931c-4436-b5fa-ee69c981a4f9' TO business_v2.goods_info (`date` Date, `series_id` UInt64, `goods_id` Int64, `total_price` Float64, `max_price` Float64) REFRESH ASYNC START('2024-04-01 10:00:00') EVERY(INTERVAL 1 MINUTE) AS SELECT toDate(create_date) AS date, series_id, goods_id, sum(price) AS total_price, max(price) AS max_price FROM business_v2.goods WHERE date > '2024-01-01' GROUP BY date, series_id, goods_id properties async_mv_partition_refresh_max_limit = 10
base_table_databases:      ['business_v2']
base_table_tables:         ['goods']
dependent_table_databases: ['business_v2']
dependent_table_tables:    ['goods']
target_table_database:     business_v2
target_table_table:        goods_info
partition_diffs:           ['depend_storage_id: business_v2.goods (ef94f3c8-c8e2-40cf-92dc-0862a0f15a4c), add_partitions: [partition-\'2024-04-14\', versoins-5, -1, ], drop_partitions: []']
latest_visible_partitions: ['business_v2.goods:partition-\'2024-04-14\', versoins-5, -1|partition-\'2024-04-13\', versoins-4, -1']
previous_partitions:       ['business_v2.goods:partition-\'2024-04-13\', versoins-4, -1']
refresh_type:              ASYNC
refresh_start_time:        2024-04-01 10:00:00
refresh_interval:          1 Minute
active:                    1
coordinator_address:       host: 10.232.201.68,exchange_port: 9520,fabric_port: 12399
refresh_status:            running : true, running_tasks: 0

刷新任务状态表-system.view_refresh_task_log

定义

CREATE TABLE system.view_refresh_task_log
(
    `database` String,
    `view` String,
    `status` Enum8('START' = 1, 'FINISH' = 2, 'EXCEPTION_EXECUTE_TASK' = 3, 'EXCEPTION_BEFORE_START' = 4),
    `refresh_type` Enum8('NONE' = 1, 'PARTITION_REFRESH' = 2, 'FULL_REFRESH' = 3),
    `refresh_mode` Enum8('LOCAL_SYNC' = 1, 'GAOBAL_ASYNC' = 2),
    `event_date` Date,
    `event_time` DateTime,
    `event_time_microseconds` DateTime64(6),
    `duration_ms` UInt64,
    `exception` String,
    `partition_map` String,
    `insert_overwrite_query` String,
    `query_id` String,
    `insert_overwrite_query_id` String,
    `local_task_partition` String,
    `local_task_query` Map(String, String) KV
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
SETTINGS index_granularity = 8192;

字段

  • database, view - 视图的库表名。
  • status - 刷新任务运行状态,分别是START, FINISH, EXCEPTION_EXECUTE_TASK, EXCEPTION_BEFORE_START。
  • refresh_type - 刷新任务类型,PARTITION_REFRESH表示当前任务按照分区粒度对齐刷新,FULL_REFRESH表示当前任务刷新所有分区。
  • refresh_mode - 刷新任务模式,LOCAL_SYNC表示同步本地刷新,LOCAL_ASYNC表示异步本地刷新,DISTRIBUTED_ASYNC-分布式异步刷新 (异步视图的任务类型)。
  • event_date, event_time - 执行的时间。
  • duration_ms - 执行时长。
  • exception - 异常信息。
  • partition_map - DISTRIBUTED_ASYNC类型会计算分区映射关系,目的表使用哪些基表进行计算。
  • insert_overwrite_query - 刷新任务insert overwrite 语句。
  • query_id - 当前任务query id。
  • insert_overwrite_query_id - 执行insert overwrite的query id。
  • local_task_partition - LOCAL_SYNC的partition信息。
  • local_task_query - 执行LOCAL_SYNC视图执行query的信息,key:query_id,value: query。

示例

异步视图任务

同步视图任务

获取coodinator节点host Address

SELECT concat(if(isIPv6String(cluster.host_name), '[', ''), cluster.host_name, if(isIPv6String(cluster.host_name), ']', ''), ':', cluster.port) AS coodinator_address
FROM
(
    SELECT regexpExtract(coordinator_address, 'host: ([a-fA-F0-9:.]+)', 1) AS extracted_ip
    FROM system.view_tables
    WHERE (table = 'mv_1') AND (database = 'test')
) AS coodinator
INNER JOIN
(
    SELECT
        host_name,
        port
    FROM system.clusters
) AS cluster ON coodinator.extracted_ip = cluster.host_name

从coodinator节点读取view_refresh_task_log

SELECT *
FROM remote('[fdbd:dc61:1a:393::30]:9030', system, view_refresh_task_log, 'default', 'P8rZPviauQIIIQIL')
WHERE event_date = today() and view = 'event_statistics_optimize_mv' and database = 'governance_vision_realtime_db';

直接在各个节点创建system.view_refresh_task_log

CREATE TABLE system.view_refresh_task_log on cluster ch_rd_dev_mv_test
(
    `database` String,
    `view` String,
    `status` Enum8('START' = 1, 'FINISH' = 2, 'EXCEPTION_EXECUTE_TASK' = 3, 'EXCEPTION_BEFORE_START' = 4),
    `refresh_type` Enum8('NONE' = 1, 'PARTITION_REFRESH' = 2, 'FULL_REFRESH' = 3),
    `refresh_mode` Enum8('LOCAL_SYNC' = 1, 'GAOBAL_ASYNC' = 2),
    `event_date` Date,
    `event_time` DateTime,
    `duration_ms` UInt64,
    `exception` String,
    `partition_map` String,
    `insert_overwrite_query` String,
    `query_id` String,
    `insert_overwrite_query_id` String,
    `local_task_partition` String,
    `local_task_query` Map(String, String) KV
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
SETTINGS index_granularity = 8192;

视图命中状态

TCP

JSON FORMAT

HTTP