You need to enable JavaScript to run this app.
导航
概览
最近更新时间:2025.03.28 15:32:05首次发布时间:2025.03.28 15:32:05
我的收藏
有用
有用
无用
无用

1. Catalog 的核心功能

Flink Catalog 是 Apache Flink 的元数据管理中心,用于统一管理数据库、表等元数据信息。其核心功能包括:

  1. **元数据持久化:**将表结构(Schema)、数据源位置、分区信息等元数据持久化存储,避免每次作业启动时重复声明。
  2. **统一元数据接口:**提供一致的 API 和 SQL 接口访问元数据,支持跨会话(Session)共享元数据。
  3. **多 Catalog 支持:**允许同时管理多个 Catalog(如 Hive Catalog、JDBC Catalog),实现不同数据源的统一管理。
  4. **与外部系统集成:**支持与 Hive Metastore、关系型数据库(如 PostgreSQL)等外部元数据存储集成。

2. 核心用途

  1. 统一元数据管理在流批一体作业中,统一管理 Kafka、MySQL、Hive 等异构数据源的元数据,避免重复定义表结构。
  2. 简化作业配置通过 Catalog 预先注册表,作业中直接通过表名引用,无需在 SQL 中重复指定 CREATE TABLE DDL。
  3. 跨会话持久化Catalog 的元数据在 Flink 会话之间持久化,适合长期运行的作业或需要共享元数据的场景。
  4. 权限与审计结合外部 Catalog(如 Hive),实现元数据的权限控制和审计(需依赖外部系统功能)。

3. Catalog 类型及适用场景

当前 Flink 平台支持以下类型的 Catalog

Catalog 类型

SQL 类型

存储方式

适用场景

HiveCatalog

hive

Hive Metastore

生产环境,集成 Hive 生态,适用于传统大数据场景。

JdbcCatalog

jdbc

关系型数据库

需要 JDBC 兼容的元数据存储(如 MySQL、PostgreSQL 等),适合与关系型数据库集成。

LASCatalog

hive

自定义存储

配合 LAS Catalog 统一管理数据湖元数据,适用于数据湖场景。

Paimon

paimon + filesystem/hive/las

支持基于文件系统、Hive、Las 多种元数据解决方案

利用 Paimon 处理流批一体,近实时的大规模数据分析场景,适合流批混合计算需求。

4. 使用步骤详解(以 Hive Catalog 为例)

1. 创建 Catalog

通过 Flink SQL 或编程 API 创建 Catalog。

SQL 示例

CREATE CATALOG hive_catalog WITH (
  'type' = 'hive',
  'hive-conf-dir' = '/path/to/hive-conf-dir'
);

Java API 示例

String name = "hive_catalog";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive-conf-dir";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(name, hiveCatalog);

2. 使用 Catalog 进行查询

直接通过 Catalog 名称引用表。

USE CATALOG hive_catalog;
SELECT * FROM default.kafka_user_behavior WHERE behavior = 'click';

3. 跨 Catalog 访问

访问不同 Catalog 中的表(如同时使用 Hive 和 JDBC Catalog)。

SELECT *
FROM hive_catalog.default.logs
JOIN jdbc_catalog.sales.orders
ON logs.order_id = orders.id;

4. Create Table Like 语句

在 Flink SQL 中,CREATE TABLE LIKE 语句用于基于现有表的定义创建一个新表。新表将继承原表的结构(包括列定义、数据类型、约束等),但不会复制原表的数据。
这个功能结合 Catalog 在需要创建结构相似的表时非常有用,可以避免重复定义表的模式。

语法

CREATE TABLE [IF NOT EXISTS] new_table_name
LIKE `catalog`.`db`.`existing_table_name`
[WITH (key1=val1, key2=val2, ...)]

参数说明

  • new_table_name:要创建的新表的名称。
  • existing_table_name:现有表的名称,新表将基于此表的定义创建。
  • IF NOT EXISTS:可选关键字。如果指定,当新表已经存在时,不会抛出异常,也不会执行任何操作。
  • WITH (key1=val1, key2=val2, ...):可选子句。用于为新表指定额外的属性或配置。这些属性可以覆盖原表的属性。

功能特点

  1. 复制表结构CREATE TABLE LIKE 会复制原表的列定义、数据类型、主键约束等信息。新表和原表的结构完全一致,但它们是独立的表,互不影响。
  2. 不复制数据:新表不会复制原表中的数据,仅复制表的结构。
  3. 支持属性覆盖:可以通过 WITH 子句为新表指定新的属性,覆盖原表的属性。例如,可以更改表的存储格式、连接信息等。
  4. 避免重复定义:当需要创建多个结构相似的表时,使用 CREATE TABLE LIKE 可以显著减少重复代码。

示例

已有外部 Hive Cataog 作为数据下游,我们需要根据这个 Hive Catalog 创建完全相同的 Kafka 数据表:

CREATE CATALOG hive_catalog WITH (
  'type' = 'hive',
  'hive-conf-dir' = '/path/to/hive-conf-dir'
);

-- 相对应上游 Kafka 数据源
CREATE TABLE kafka_source (
    -- 增加 watermark 定义
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    ...
)
LIKE `hive_catalog`.`db`.`orders` (
    -- 排除 Hive 的包括分区、和文件系统所有的配置选项
    EXCLUDING ALL
);

表合并策略

表特性的合并逻辑可以通过 LIKE 选项进行控制。您可以控制以下特性的合并行为:

  • CONSTRAINTS:约束,例如主键和唯一键。
  • GENERATED:计算列。
  • METADATA:元数据列。
  • OPTIONS:描述连接器和格式属性的连接器选项。
  • PARTITIONS:表的分区。
  • WATERMARKS:水印声明。

合并策略有三种:

  • INCLUDING:保留源表的特性,如果存在重复条目(例如,如果两个表中存在相同键的选项),则会失败。
  • EXCLUDING:不包含源表的给定特性。
  • OVERWRITING:包含源表的特性,并使用新表的属性覆盖源表的重复条目(例如,如果两个表中存在相同键的选项,则使用当前语句中的选项)。

此外,您可以使用 INCLUDING/EXCLUDING ALL 选项来指定在没有定义特定策略时应采用的策略。例如,如果您使用 EXCLUDING ALL INCLUDING WATERMARKS,则仅从源表中包含水印。

5. CTAS 语法

表也可以通过查询结果在一条 Create-Table-As-Select (CTAS) 语句中创建并填充数据。CTAS 是使用单条命令创建表并插入数据的最简单且最快的方式。
CTAS 包含两部分:

  1. SELECT 部分:可以是 Flink SQL 支持的任何 SELECT 查询。
  2. CREATE 部分:从 SELECT 部分获取结果模式,并创建目标表。与 CREATE TABLE 类似,CTAS 需要在 WITH 子句中指定目标表的必需选项。

CTAS 的创建表操作依赖于目标 Catalog。例如:

  • Hive Catalog:会自动在 Hive 中创建物理表。
  • 内存 Catalog:会在执行 SQL 的客户端内存中注册表元数据。

例如如下语句

CREATE TABLE my_ctas_table
WITH (
    'connector' = 'kafka',
    ...
)
AS SELECT id, name, age FROM `catalog`.`db`.`source_table` WHERE mod(id, 10) = 0;

生成的表 my_ctas_table 将等效于使用以下语句创建表并插入数据:

CREATE TABLE my_ctas_table (
    id BIGINT,
    name STRING,
    age INT
) WITH (
    'connector' = 'kafka',
    ...
);
 
INSERT INTO my_ctas_table SELECT id, name, age 
FROM `catalog`.`db`.`source_table` WHERE mod(id, 10) = 0;

注意事项:

  1. CTAS 的限制
    • 目前不支持创建临时表。
    • 目前不支持显式指定列。
    • 目前不支持显式指定水印。
    • 目前不支持创建分区表。
    • 目前不支持指定主键约束。
  2. 非原子性:目前,CTAS 创建的目标表是非原子性的。如果在插入数据时发生错误,表不会自动删除。