Flink Catalog 是 Apache Flink 的元数据管理中心,用于统一管理数据库、表等元数据信息。其核心功能包括:
CREATE TABLE
DDL。当前 Flink 平台支持以下类型的 Catalog
Catalog 类型 | SQL 类型 | 存储方式 | 适用场景 |
---|---|---|---|
HiveCatalog | hive | Hive Metastore | 生产环境,集成 Hive 生态,适用于传统大数据场景。 |
JdbcCatalog | jdbc | 关系型数据库 | 需要 JDBC 兼容的元数据存储(如 MySQL、PostgreSQL 等),适合与关系型数据库集成。 |
LASCatalog | hive | 自定义存储 | 配合 LAS Catalog 统一管理数据湖元数据,适用于数据湖场景。 |
Paimon | paimon + filesystem/hive/las | 支持基于文件系统、Hive、Las 多种元数据解决方案 | 利用 Paimon 处理流批一体,近实时的大规模数据分析场景,适合流批混合计算需求。 |
通过 Flink SQL 或编程 API 创建 Catalog。
SQL 示例
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'hive-conf-dir' = '/path/to/hive-conf-dir' );
Java API 示例
String name = "hive_catalog"; String defaultDatabase = "default"; String hiveConfDir = "/path/to/hive-conf-dir"; HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog(name, hiveCatalog);
直接通过 Catalog 名称引用表。
USE CATALOG hive_catalog; SELECT * FROM default.kafka_user_behavior WHERE behavior = 'click';
访问不同 Catalog 中的表(如同时使用 Hive 和 JDBC Catalog)。
SELECT * FROM hive_catalog.default.logs JOIN jdbc_catalog.sales.orders ON logs.order_id = orders.id;
在 Flink SQL 中,CREATE TABLE LIKE
语句用于基于现有表的定义创建一个新表。新表将继承原表的结构(包括列定义、数据类型、约束等),但不会复制原表的数据。
这个功能结合 Catalog 在需要创建结构相似的表时非常有用,可以避免重复定义表的模式。
CREATE TABLE [IF NOT EXISTS] new_table_name LIKE `catalog`.`db`.`existing_table_name` [WITH (key1=val1, key2=val2, ...)]
CREATE TABLE LIKE
会复制原表的列定义、数据类型、主键约束等信息。新表和原表的结构完全一致,但它们是独立的表,互不影响。WITH
子句为新表指定新的属性,覆盖原表的属性。例如,可以更改表的存储格式、连接信息等。CREATE TABLE LIKE
可以显著减少重复代码。已有外部 Hive Cataog 作为数据下游,我们需要根据这个 Hive Catalog 创建完全相同的 Kafka 数据表:
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'hive-conf-dir' = '/path/to/hive-conf-dir' ); -- 相对应上游 Kafka 数据源 CREATE TABLE kafka_source ( -- 增加 watermark 定义 WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', ... ) LIKE `hive_catalog`.`db`.`orders` ( -- 排除 Hive 的包括分区、和文件系统所有的配置选项 EXCLUDING ALL );
表特性的合并逻辑可以通过 LIKE
选项进行控制。您可以控制以下特性的合并行为:
合并策略有三种:
此外,您可以使用 INCLUDING/EXCLUDING ALL
选项来指定在没有定义特定策略时应采用的策略。例如,如果您使用 EXCLUDING ALL INCLUDING WATERMARKS
,则仅从源表中包含水印。
表也可以通过查询结果在一条 Create-Table-As-Select (CTAS) 语句中创建并填充数据。CTAS 是使用单条命令创建表并插入数据的最简单且最快的方式。
CTAS 包含两部分:
CREATE TABLE
类似,CTAS 需要在 WITH
子句中指定目标表的必需选项。CTAS 的创建表操作依赖于目标 Catalog。例如:
例如如下语句
CREATE TABLE my_ctas_table WITH ( 'connector' = 'kafka', ... ) AS SELECT id, name, age FROM `catalog`.`db`.`source_table` WHERE mod(id, 10) = 0;
生成的表 my_ctas_table
将等效于使用以下语句创建表并插入数据:
CREATE TABLE my_ctas_table ( id BIGINT, name STRING, age INT ) WITH ( 'connector' = 'kafka', ... ); INSERT INTO my_ctas_table SELECT id, name, age FROM `catalog`.`db`.`source_table` WHERE mod(id, 10) = 0;
注意事项: