Apache Paimon 是一种新型的流式数据湖存储技术,结合了 LSM-Tree 结构和湖格式,提供了高吞吐、低延迟的数据摄入、完整的流式变更订阅以及高效的 OLAP 查询能力。本手册将指导您如何使用 Flink 引擎通过基于 FileSystem 的元数据方案,进行 Paimon 的开发任务,涵盖从环境准备到数据写入、读取的全流程。
确保您已经:
操作路径:作业开发 - Flink SQL 作业 - 创建作业
参考文档:开发 Flink SQL 任务
在 Flink 中使用 Paimon 的第一步是创建一个 Catalog。Catalog 是 Paimon 中用于管理数据库和表的元数据存储。在创建 Catalog 之前需要创建相关的 TOS 桶和 Catalog 文件夹。相关文档,请参见创建存储桶、创建文件夹。
注意:请确保 Flink 和 TOS 处在同一个 Region,Flink 当前暂不支持跨 Region 访问 TOS Bucket。
CREATE CATALOG ${catalog_name} WITH ( 'type' = 'paimon', 'warehouse' = 'tos://${bucket_name}/${catalog_name}' );
${catalog_name}
:Catalog 的名称,自定义。${bucket_name}
:存储 Paimon 数据的 TOS(对象存储)桶名称。在 Catalog 中创建一个 Database,用于组织和管理表。
CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
${db_name}
:Database 的名称,自定义。在 Database 中创建表,定义表结构和相关配置。
CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` ( word varchar, -- 示例字段 cnt bigint, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据 'changelog-producer' = 'input' -- 产生 changelog,用于下游流读 );
${table_name}
:表的名称,自定义。bucket
:分桶数量,推荐单个 bucket 存储 1GB 左右数据。changelog-producer
:
input
,表示产生根据上游新增数据,用于下游流式读取。具体参考 Changelog 产出机制进行详细选择。如果不需要 changelog,则使用 none
选项以节省存储和写入资源。以下示例展示了如何使用 Flink SQL 将数据写入 Paimon 表。
首先,创建一个数据源表,用于生成模拟数据。
CREATE TABLE doc_source (word varchar) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.word.length' = '30' );
connector
:使用 datagen
连接器生成模拟数据。rows-per-second
:每秒生成的行数。fields.word.length
:生成字段 word
的长度。将数据源表中的数据写入 Paimon 表。
INSERT INTO `paimon_test`.`default`.`doc_result` select t.word, count(1) from doc_source t GROUP BY t.word;
paimon_test
:Catalog 名称。default
:Database 名称。doc_result
:目标表名称。Paimon 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。
注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。
Checkpoint 开启如下图,在 作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔 进行设置。
此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适资源池进行上线。
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。
可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。
以下示例展示了如何使用 Flink SQL 从 Paimon 表中流/批式读取数据。
创建一个打印表,用于输出读取的数据。
CREATE TABLE `print_table` ( word varchar, cnt bigint ) WITH ( 'connector' = 'print' );
connector
:使用 print
连接器将数据打印到控制台。从 Paimon 表中读取数据并写入打印表。
INSERT INTO `print_table` SELECT * FROM `paimon_test`.`default`.`doc_result`;
paimon_test
:Catalog 名称。default
:Database 名称。doc_result
:源表名称。