Apache Paimon 是一个基于流和批处理的实时数据湖解决方案,结合了高效的存储和灵活的计算,专为处理大规模实时数据和流式数据而设计。Paimon 为 Flink 引擎提供了完善的 Catalog 接口支持,可以通过 Catalog 方便的管理实时数据湖元数据。
在创建 Catalog 之前需要创建相关的 TOS 桶和 Catalog 文件夹。相关文档,请参见创建存储桶、创建文件夹。
注意:请确保 Flink 和 TOS 处在同一个 Region,Flink 当前暂不支持跨 Region 访问 TOS Bucket。
CREATE CATALOG ${catalog_name} WITH ( 'type' = 'paimon', 'warehouse' = 'tos://${bucket_name}/${catalog_name}' );
${catalog_name}
:Catalog 的名称,自定义。${bucket_name}
:存储 Paimon 数据的 TOS(对象存储)桶名称。前置条件:因为 Flink 同步 LAS 元数据,需要通过 API 接口访问。需要
在 LAS Catalog 产品中创建 Paimon 的数据目录,需要参考数据目录管理,进行数据目录创建。需要填写数据目录名称和数据目录存储位置。
注意:这里的 TOS 桶和目录需要提前创建好,也需要和 Flink SQL 中的目录地址保持一致。
并且在 LAS Catalog 权限管理模块,选择对于刚刚创建好的 Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <!-- `metastore.catalog.default` 是写入 Hive Catalog 的名称,如果不填默认是 hive --> <property> <name>metastore.catalog.default</name> <value>paimon_test</value> </property> <property> <name>hive.server2.max.start.attempts</name> <value>5</value> </property> <!-- `hive.client.las.region.name` 是 LAS 和 Flink 服务所在 Region `hive.metastore.uris` 是 LAS 的 thrift 接口地址,需要和所在 Region 保持一致 --> <property> <name>hive.client.las.region.name</name> <value>cn-shanghai</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://lakeformation.las.cn-shanghai.ivolces.com:48869</value> </property> <property> <name>hive.hms.client.is.public.cloud</name> <value>true</value> </property> <!-- `hive.client.las.ak/sk` 是 LAS 服务访问的 AccessKeyId 和 AccessKeySecret 填写前请确认子账号拥有 LAS 的 IAM 访问权限,和 Catalog 的数据权限 --> <property> <name>hive.client.las.ak</name> <value>__LAS_ACCESS_KEY__</value> </property> <property> <name>hive.client.las.sk</name> <value>__LAS_ACCESS_KEY_SECRET__</value> </property> </configuration>
因为 LAS Catalog 提供兼容 Hive 的元数据 thrift 接口。所以在 Flink 中使用 LAS Catalog 的方式和基于 Hive 的 Catalog 的方式非常相似。我们可以通过如下 SQL 创建 LAS Catalog:
CREATE CATALOG ${catalog_name} WITH ( 'type' = 'paimon', 'metastore' = 'hive', -- uri 填写 LAS 的内网地址如下,不同区域可以参考 'uri' = 'thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869', 'hive-conf-dir' = '/opt/tiger/workdir', 'warehouse' = 'tos://${bucket_name}/${catalog_name}' );
其中两个变量需要按照如下:
${catalog_name}
:Catalog 的名称,和 LAS 中的 Catalog 名称需要保持一致。${bucket_name}
:存储 Paimon 数据的 TOS(对象存储)桶名称。WITH 参数的意义如下:
type
:选择 paimon 类型 Catalog。metastore
:选择 hive 的方式进行元数据管理。uri
:LAS Catalog 的元数据 thrift 接口,可以参考示例中的地址,将cn-guangzhou
改成和 Flink 服务相同的区域即可。hive-conf-dir
:这里指定 hive-site.xml 的文件路径。注意:按照本篇文档内容测试方案,此处需要固定填写 /opt/tiger/workdir
。warehouse
:和 LAS Catalog 中的数据目录存储位置保持一致。在 Catalog 中创建一个 Database,用于组织和管理表。
CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
${db_name}
:Database 的名称,自定义。在 Database 中创建表,定义表结构和相关配置。
CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` ( word varchar, -- 示例字段 cnt bigint, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据 'changelog-producer' = 'input' -- 产生 changelog,用于下游流读 );
${table_name}
:表的名称,自定义。bucket
:分桶数量,推荐单个 bucket 存储 1GB 左右数据。changelog-producer
:
input
,表示产生根据上游新增数据,用于下游流式读取。具体参考 Changelog 产出机制进行详细选择。如果不需要 changelog,则使用 none
选项以节省存储和写入资源。Paimon 查询 SQL 同时支持流读、批读,只要在 Flink 运行过程中选择对应的模式即可:
INSERT INTO `print_table` SELECT * FROM `paimon_test`.`default`.`doc_result`;
Flink Insert 语句支持流写、批写两种语义,只要在运行过程中选择相应的执行模式即可:
INSERT INTO `paimon_test`.`default`.`doc_result` select t.word, count(1) from doc_source t GROUP BY t.word;
除了 Insert 语句之外,Paimon 也支持对数据表、分区等进行批式覆盖写:
-- 覆盖写入非分区表 INSERT OVERWRITE my_table SELECT ... -- 覆盖写入分区表 INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...