Apache Paimon 是一种新型的流式数据湖存储技术,结合了 LSM-Tree 结构和湖格式,提供了高吞吐、低延迟的数据摄入、完整的流式变更订阅以及高效的 OLAP 查询能力。本手册将指导您如何使用 Flink 引擎进行 Paimon 的开发任务,并且利用 LAS Catalog 统一管理 Paimon 湖的元数据。
确保您已经:
操作路径:作业开发 - Flink SQL 作业 - 创建作业。
参考文档:开发 Flink SQL 任务
前置条件:因为 Flink 同步 LAS 元数据,需要通过 API 接口访问。需要
在 LAS Catalog 产品中创建 Paimon 的数据目录,需要参考数据目录管理,进行数据目录创建。需要填写数据目录名称和数据目录存储位置。
注意:这里的 TOS 桶和目录需要提前创建好,也需要和 Flink SQL 中的目录地址保持一致。
并且在 LAS Catalog 权限管理模块,选择对于刚刚创建好的 Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <!-- `metastore.catalog.default` 是写入 Hive Catalog 的名称,如果不填默认是 hive --> <property> <name>metastore.catalog.default</name> <value>paimon_test</value> </property> <property> <name>hive.server2.max.start.attempts</name> <value>5</value> </property> <!-- `hive.client.las.region.name` 是 LAS 和 Flink 服务所在 Region `hive.metastore.uris` 是 LAS 的 thrift 接口地址,需要和所在 Region 保持一致 --> <property> <name>hive.client.las.region.name</name> <value>cn-shanghai</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://lakeformation.las.cn-shanghai.ivolces.com:48869</value> </property> <property> <name>hive.hms.client.is.public.cloud</name> <value>true</value> </property> <!-- `hive.client.las.ak/sk` 是 LAS 服务访问的 AccessKeyId 和 AccessKeySecret 填写前请确认子账号拥有 LAS 的 IAM 访问权限,和 Catalog 的数据权限 --> <property> <name>hive.client.las.ak</name> <value>__LAS_ACCESS_KEY__</value> </property> <property> <name>hive.client.las.sk</name> <value>__LAS_ACCESS_KEY_SECRET__</value> </property> </configuration>
因为 LAS Catalog 提供兼容 Hive 的元数据 thrift 接口。所以在 Flink 中使用 LAS Catalog 的方式和基于 Hive 的 Catalog 的方式非常相似。我们可以通过如下 SQL 创建 LAS Catalog:
CREATE CATALOG ${catalog_name} WITH ( 'type' = 'paimon', 'metastore' = 'hive', -- uri 填写 LAS 的内网地址如下,不同区域可以参考 'uri' = 'thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869', 'hive-conf-dir' = '/opt/tiger/workdir', 'warehouse' = 'tos://${bucket_name}/${catalog_name}' );
其中两个变量需要按照如下:
${catalog_name}
:Catalog 的名称,和 LAS 中的 Catalog 名称需要保持一致。${bucket_name}
:存储 Paimon 数据的 TOS(对象存储)桶名称。WITH 参数的意义如下:
type
:选择 paimon 类型 Catalog。metastore
:选择 hive 的方式进行元数据管理。uri
:LAS Catalog 的元数据 thrift 接口,可以参考示例中的地址,将cn-guangzhou
改成和 Flink 服务相同的区域即可。hive-conf-dir
:这里指定 hive-site.xml 的文件路径。注意:按照本篇文档内容测试方案,此处需要固定填写 /opt/tiger/workdir
。warehouse
:和 LAS Catalog 中的数据目录存储位置保持一致。在 Catalog 中创建一个 Database,用于组织和管理表。
CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
${db_name}
:Database 的名称,自定义。在 Database 中创建表,定义表结构和相关配置。以下是一个主键表的非分区表的示例
CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` ( word varchar, -- 示例字段 cnt bigint, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据 'changelog-producer' = 'input', -- 产生 changelog,用于下游流读 );
${table_name}
:表的名称,自定义。bucket
:分桶数量,推荐单个 bucket 存储 1GB 左右数据。changelog-producer
:
input
,表示产生根据上游新增数据,用于下游流式读取。具体参考 Changelog 产出机制进行详细选择。如果不需要 changelog,则使用 none
选项以节省存储和写入资源。在 Database 中创建表,定义表结构和相关配置。以下是一个主键表的分区表的示例:
CREATE TABLE IF NOT EXISTS paimon_test1.test_db5.test_table5 ( word varchar, -- 示例字段 cnt bigint, dt STRING, hh STRING, PRIMARY KEY (dt, hh, word) NOT ENFORCED -- 一般分区主键表的主键字段必须包含分区字段 ) PARTITIONED BY (dt, hh) WITH ( 'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据 'changelog-producer' = 'none', -- 产生 changelog,用于下游流读 'metastore.partitioned-table' = 'true' -- 确定是否将分区信息同步到 LAS 元数据管理 );
metastore.partitioned-table
:开启后将分区信息会同步到 LAS 元数据管理,默认 false
不开启。以下示例展示了如何使用 Flink SQL 将数据写入 Paimon 表。
首先,创建一个数据源表,用于生成模拟数据。
CREATE TABLE doc_source (word varchar) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.word.length' = '30' );
connector
:使用 datagen
连接器生成模拟数据。rows-per-second
:每秒生成的行数。fields.word.length
:生成字段 word
的长度。将数据源表中的数据写入 Paimon 表。
INSERT INTO `paimon_test`.`default`.`doc_result` select t.word, count(1) from doc_source t GROUP BY t.word;
paimon_test
:Catalog 名称。default
:Database 名称。doc_result
:目标表名称。Paimon 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。
注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。
Checkpoint 开启如下图,在作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔进行设置。
此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。请注意:如果遇到 Caused by: org.apache.thrift.transport.TTransportException
此类问题,可以参考 8.1 验证 SQL 时报错的描述。可以先暂时忽略此问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适的资源池和跳过上线前深度检查后。可以上线任务。
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。
可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。
我们以上已经确认了数据写入成功,以下示例展示了如何使用 Flink SQL 从 Paimon 表中流/批式读取数据。进一步可以确认数据准确性。
创建一个打印表,用于输出读取的数据。
CREATE TABLE `print_table` ( word varchar, cnt bigint ) WITH ( 'connector' = 'print' );
connector
:使用 print
连接器将数据打印到控制台。从 Paimon 表中读取数据并写入打印表。
INSERT INTO `print_table` SELECT * FROM `paimon_test`.`default`.`doc_result`;
paimon_test
:Catalog 名称。default
:Database 名称。doc_result
:源表名称。参考查看数据表,进行数据表的元数据查询。
参考 Spark 访问 Paimon ,通过 Spark 引擎对 Paimon 的湖数据进行批式处理。
如果在验证 SQL 的时候(点击验证按钮,或者上线时候自动检查 SQL)报错如下,形如 Caused by: org.apache.thrift.transport.TTransportException
此类错误,说明当前连接 LAS 接口不同。请不要慌张,当前版本暂时无法在验证 SQL 阶段访问 LAS 元数据。
org.apache.flink.table.api.ValidationException: Unable to create catalog 'paimon_test1'. Catalog options are: 'hive-conf-dir'='/opt/tiger/workdir' 'metastore'='hive' 'type'='paimon' 'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869' 'warehouse'='tos://flink-cwz-paimon/paimon_test1' at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:511) ... Caused by: java.lang.RuntimeException: Failed to determine if database default exists at org.apache.paimon.hive.HiveCatalog.databaseExistsImpl(HiveCatalog.java:223) ... 9 more Caused by: org.apache.thrift.transport.TTransportException at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ... 15 more
解决方案:任务上线过程中选择更多设置 - 跳过上线前的深度检查。
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口无法访问
Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException
可能原因
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口没有成功授权
Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Access denied: [DeniedPrivilege(resource:Resource{resourceScope='SCHEMA', catalogName='paimon_test1', schemaName='test_db'}, action:DESCRIBE)] for user: 31035840
解决方法:这个问题是因为在 LAS Catalog 中没有给指定账号赋予相关权限。请结合报错日志信息提示的 action,参考数据目录管理,为账号开通权限即可。
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且运行事件中发现如下报错,org.apache.flink.table.catalog exceptions.DatabaseNotExistException
解决办法:这个原因是因为在 Flink 任务提交阶段,静态解析 SQL 的时候,当前不会去连接 LAS 获取已有的数据库,所以必须在 SQL 中显式写明建库语句。在 SQL 代码中加入以下语句:
CREATE TABLE IF NOT EXISTS test_db;
重新提交任务之后,就可以恢复正常。
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且日志中发现如下报错,com.ctc.wstx.exc.WstxParsingException
:
Caused by: java.lang.RuntimeException: com.ctc.wstx.exc.WstxParsingException: Illegal processing instruction target ("xml"); xml (case insensitive) is reserved by the specs. at [row,col {unknown-source}]: [2,5]
解决办法:这个原因是因为对接 LAS 元数据中依赖的 hive-site.xml 的格式可能不正确。需要检查 xml 文件是否符合语法规范。常见 xml 格式问题可以参考:
<?xml ...
,在尖括号前方不能包含任何不可见字符、空格、空行等。<
、>
、&
等特殊字符,如果出现需要用 CDATA 语法标记为普通文本。如果需要删除 LAS 元数据中的库表,需要同时手动删除 LAS 元数据中的库表信息,以及 TOS 目录上的数据库表的文件路径。如果仅仅删除 LAS 元数据或者仅仅删除 TOS 目录的数据都会造成数据不一致。报以下类似的错误,导致任务失败:
解决方案:判断属于哪一种情况,将已有的 LAS 元数据和 TOS 文件数据都删除后才能保证数据库表继续正常写入。
如果我们使用了分区表,但是发现 LAS 和 TOS 的表信息不同:
解决方案:Paimon 不会自动将表的分区信息同步到 LAS 元数据管理。如果需要在 LAS 元数据管理看到数据表的分区字段,需要在建表语句中增加如下 WITH 参数:
'metastore.partitioned-table' = 'true' -- 确定是否将分区信息同步到 LAS 元数据管理
需要注意的是:因为分区字段无法动态增加,增加参数后,需要将原有的数据表清掉(包括 LAS 元数据和 TOS 的数据文件),然后重新创建。