You need to enable JavaScript to run this app.
导航
Paimon 使用 LAS Catalog 管理元数据
最近更新时间:2025.02.10 10:49:01首次发布时间:2025.02.05 20:03:59
我的收藏
有用
有用
无用
无用

1. 概述

Apache Paimon 是一种新型的流式数据湖存储技术,结合了 LSM-Tree 结构和湖格式,提供了高吞吐、低延迟的数据摄入、完整的流式变更订阅以及高效的 OLAP 查询能力。本手册将指导您如何使用 Flink 引擎进行 Paimon 的开发任务,并且利用 LAS Catalog 统一管理 Paimon 湖的元数据。

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink SQL 任务。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。
  3. 开通了 LAS Catalog 统一元数据管理服务,可以参考 LAS Catalog 开通文档

  • Flink 版本需 >= 1.16,具体支持功能如下:
    • Flink 1.16 内置 Paimon 0.6 版本。
    • Flink 1.17 内置 Paimon 0.8.2 字节加强版功能,支持如存储过程等语法。

操作路径:作业开发 - Flink SQL 作业 - 创建作业。
参考文档开发 Flink SQL 任务

Image

3. 创建 Catalog

前置条件:因为 Flink 同步 LAS 元数据,需要通过 API 接口访问。需要

  1. 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret (后续需要填写在 hive-site.xml 中)。
  2. 在权限管理中为子账号开通 LASFullAccess 权限。

3.1 LAS 数据目录创建和授权

在 LAS Catalog 产品中创建 Paimon 的数据目录,需要参考数据目录管理,进行数据目录创建。需要填写数据目录名称和数据目录存储位置。

注意:这里的 TOS 桶和目录需要提前创建好,也需要和 Flink SQL 中的目录地址保持一致。

Image

并且在 LAS Catalog 权限管理模块,选择对于刚刚创建好的 Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
Image

3.1 任务上传依赖文件

  1. 操作路径:控制台 - 进入项目 - 作业开发 - 创建 SQL/CDC 任务 - 参数配置(右侧按钮) - 依赖文件。
  2. 准备依赖文件
    1. 下载 LAS Catalog Connector。
flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT.jar
未知大小
  1. 下载 hive-site.xml 文件模板(文件名必须保证一模一样),修改相关访问和认证信息。
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <!--
  `metastore.catalog.default` 是写入 Hive Catalog 的名称,如果不填默认是 hive 
  -->
  <property>
    <name>metastore.catalog.default</name>
    <value>paimon_test</value>
  </property>
  <property>
    <name>hive.server2.max.start.attempts</name>
    <value>5</value>
  </property>
  
  <!--
  `hive.client.las.region.name` 是 LAS 和 Flink 服务所在 Region
  `hive.metastore.uris` 是 LAS 的 thrift 接口地址,需要和所在 Region 保持一致
  -->
  <property>
    <name>hive.client.las.region.name</name>
    <value>cn-shanghai</value>
  </property>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://lakeformation.las.cn-shanghai.ivolces.com:48869</value>
  </property>

  <property>
    <name>hive.hms.client.is.public.cloud</name>
    <value>true</value>
  </property>

  <!--
  `hive.client.las.ak/sk` 是 LAS 服务访问的 AccessKeyId 和 AccessKeySecret
  填写前请确认子账号拥有 LAS 的 IAM 访问权限,和 Catalog 的数据权限
  -->
  <property>
    <name>hive.client.las.ak</name>
    <value>__LAS_ACCESS_KEY__</value>
  </property>
  <property>
    <name>hive.client.las.sk</name>
    <value>__LAS_ACCESS_KEY_SECRET__</value>
  </property>
</configuration>
  1. 上传依赖文件:如下图,执行上传文件资源的操作:

Image

因为 LAS Catalog 提供兼容 Hive 的元数据 thrift 接口。所以在 Flink 中使用 LAS Catalog 的方式和基于 Hive 的 Catalog 的方式非常相似。我们可以通过如下 SQL 创建 LAS Catalog:

CREATE CATALOG ${catalog_name} WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    -- uri 填写 LAS 的内网地址如下,不同区域可以参考
    'uri' = 'thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869',
    'hive-conf-dir' = '/opt/tiger/workdir',
    'warehouse' = 'tos://${bucket_name}/${catalog_name}'
);

其中两个变量需要按照如下:

  • ${catalog_name}:Catalog 的名称,和 LAS 中的 Catalog 名称需要保持一致。
  • ${bucket_name}:存储 Paimon 数据的 TOS(对象存储)桶名称。

WITH 参数的意义如下:

  • type:选择 paimon 类型 Catalog。
  • metastore:选择 hive 的方式进行元数据管理。
  • uri:LAS Catalog 的元数据 thrift 接口,可以参考示例中的地址,将cn-guangzhou改成和 Flink 服务相同的区域即可。
  • hive-conf-dir:这里指定 hive-site.xml 的文件路径。注意:按照本篇文档内容测试方案,此处需要固定填写 /opt/tiger/workdir
  • warehouse:和 LAS Catalog 中的数据目录存储位置保持一致。

4. 创建 Database 和 Table

4.1 创建 Database

在 Catalog 中创建一个 Database,用于组织和管理表。

CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
  • ${db_name}:Database 的名称,自定义。

4.2 创建非分区表

在 Database 中创建表,定义表结构和相关配置。以下是一个主键表的非分区表的示例

CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` (
    word varchar, -- 示例字段
    cnt bigint,
    PRIMARY KEY (word) NOT ENFORCED
) WITH (
    'bucket' = '4',  -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据
    'changelog-producer' = 'input', -- 产生 changelog,用于下游流读
);
  • ${table_name}:表的名称,自定义。
  • bucket:分桶数量,推荐单个 bucket 存储 1GB 左右数据。
  • changelog-producer
    • 设置为 input,表示产生根据上游新增数据,用于下游流式读取。具体参考 Changelog 产出机制进行详细选择。如果不需要 changelog,则使用 none选项以节省存储和写入资源。

4.3 创建分区表

在 Database 中创建表,定义表结构和相关配置。以下是一个主键表的分区表的示例:

CREATE TABLE
  IF NOT EXISTS paimon_test1.test_db5.test_table5 (
    word varchar, -- 示例字段
    cnt bigint,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, word) NOT ENFORCED -- 一般分区主键表的主键字段必须包含分区字段
  ) PARTITIONED BY (dt, hh)
WITH
  (
    'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据
    'changelog-producer' = 'none', -- 产生 changelog,用于下游流读
    'metastore.partitioned-table' = 'true' -- 确定是否将分区信息同步到 LAS 元数据管理
  );
  • metastore.partitioned-table:开启后将分区信息会同步到 LAS 元数据管理,默认 false不开启。

5. 数据写入示例

以下示例展示了如何使用 Flink SQL 将数据写入 Paimon 表。

5.1 创建数据源表

首先,创建一个数据源表,用于生成模拟数据。

CREATE TABLE doc_source (word varchar)
WITH
  (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.word.length' = '30'
  );
  • connector:使用 datagen 连接器生成模拟数据。
  • rows-per-second:每秒生成的行数。
  • fields.word.length:生成字段 word 的长度。

5.2 写入数据到 Paimon 表

将数据源表中的数据写入 Paimon 表。

INSERT INTO `paimon_test`.`default`.`doc_result`
select
  t.word,
  count(1)
from
  doc_source t
GROUP BY
  t.word;
  • paimon_test:Catalog 名称。
  • default:Database 名称。
  • doc_result:目标表名称。

5.3 开启 Checkpoint

Paimon 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。

注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。

Checkpoint 开启如下图,在作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔进行设置。
Image

6. 上线任务

6.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。请注意:如果遇到 Caused by: org.apache.thrift.transport.TTransportException此类问题,可以参考 8.1 验证 SQL 时报错的描述。可以先暂时忽略此问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适的资源池和跳过上线前深度检查后。可以上线任务。
Image
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

6.2 确认任务执行成功

可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。

Image

7. 数据读取示例

我们以上已经确认了数据写入成功,以下示例展示了如何使用 Flink SQL 从 Paimon 表中流/批式读取数据。进一步可以确认数据准确性。

7.1 创建打印表

创建一个打印表,用于输出读取的数据。

CREATE TABLE `print_table` (
    word varchar,
    cnt bigint
) WITH (
    'connector' = 'print'
);
  • connector:使用 print 连接器将数据打印到控制台。

7.2 读取 Paimon 表数据

从 Paimon 表中读取数据并写入打印表。

INSERT INTO `print_table`
SELECT * FROM `paimon_test`.`default`.`doc_result`;
  • paimon_test:Catalog 名称。
  • default:Database 名称。
  • doc_result:源表名称。

7.3 使用 LAS Catalog 查看元数据

参考查看数据表,进行数据表的元数据查询。

Image

7.4 使用 EMR Serverless Spark 查询 Paimon 数据

参考 Spark 访问 Paimon ,通过 Spark 引擎对 Paimon 的湖数据进行批式处理。

8. 常见问题

8.1 验证 SQL 时报错

如果在验证 SQL 的时候(点击验证按钮,或者上线时候自动检查 SQL)报错如下,形如 Caused by: org.apache.thrift.transport.TTransportException此类错误,说明当前连接 LAS 接口不同。请不要慌张,当前版本暂时无法在验证 SQL 阶段访问 LAS 元数据。

org.apache.flink.table.api.ValidationException: Unable to create catalog 'paimon_test1'.

Catalog options are:
'hive-conf-dir'='/opt/tiger/workdir'
'metastore'='hive'
'type'='paimon'
'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869'
'warehouse'='tos://flink-cwz-paimon/paimon_test1'
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:511)
        ...
Caused by: java.lang.RuntimeException: Failed to determine if database default exists
        at org.apache.paimon.hive.HiveCatalog.databaseExistsImpl(HiveCatalog.java:223)
        ... 9 more
Caused by: org.apache.thrift.transport.TTransportException
        at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
        ... 15 more

解决方案:任务上线过程中选择更多设置 - 跳过上线前的深度检查。
Image

8.2 运行时访问 LAS 接口失败

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口无法访问

Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException

Image
可能原因

  1. 没有上传 hive-site.xml 文件,或者文件名不正确
    1. 解决方法:检查 hive-site.xml 是否成功上传到依赖文件中,并且文件名必须完全符合要求。
  2. 访问 LAS 的 AK/SK 不正确,无法正确认证用户信息。
    1. 解决方法:检查 hive-site.xml 中 AccessKey 和 AccessKeySecret 是否正确。
  3. 访问 LAS 的用户没有 IAM 的 LASFullAccess 权限,请联系管理者开通该权限
    1. 解决方法:联系主账号管理者,检查是否为用户开通 LASFullAccess 权限。

8.3 访问 LAS 报接口无权限

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口没有成功授权

Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Access denied: [DeniedPrivilege(resource:Resource{resourceScope='SCHEMA', catalogName='paimon_test1', schemaName='test_db'}, action:DESCRIBE)] for user: 31035840

Image
解决方法:这个问题是因为在 LAS Catalog 中没有给指定账号赋予相关权限。请结合报错日志信息提示的 action,参考数据目录管理,为账号开通权限即可。

8.4 任务无法启动,报 LAS 数据库不存在

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且运行事件中发现如下报错,org.apache.flink.table.catalog exceptions.DatabaseNotExistException
Image
解决办法:这个原因是因为在 Flink 任务提交阶段,静态解析 SQL 的时候,当前不会去连接 LAS 获取已有的数据库,所以必须在 SQL 中显式写明建库语句。在 SQL 代码中加入以下语句:

CREATE TABLE IF NOT EXISTS test_db;

重新提交任务之后,就可以恢复正常。

8.5 hive-site.xml 格式不正确

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且日志中发现如下报错,com.ctc.wstx.exc.WstxParsingException

Caused by: java.lang.RuntimeException: com.ctc.wstx.exc.WstxParsingException: Illegal processing instruction target ("xml"); xml (case insensitive) is reserved by the specs.
 at [row,col {unknown-source}]: [2,5]

Image
解决办法:这个原因是因为对接 LAS 元数据中依赖的 hive-site.xml 的格式可能不正确。需要检查 xml 文件是否符合语法规范。常见 xml 格式问题可以参考:

  1. 文件开头的内容必须是 <?xml ...,在尖括号前方不能包含任何不可见字符、空格、空行等。
  2. xml 文档内必须包含合法的标签,比如在内容中不能出现 <>&等特殊字符,如果出现需要用 CDATA 语法标记为普通文本。

8.6 删除 LAS Catalog 的库表应该怎么操作

如果需要删除 LAS 元数据中的库表,需要同时手动删除 LAS 元数据中的库表信息,以及 TOS 目录上的数据库表的文件路径。如果仅仅删除 LAS 元数据或者仅仅删除 TOS 目录的数据都会造成数据不一致。报以下类似的错误,导致任务失败:

  1. 仅删除 TOS 文件路径,但未删除 LAS 元数据

Image

  1. 仅删除 LAS 元数据,但未删除 TOS 文件数据

Image
解决方案:判断属于哪一种情况,将已有的 LAS 元数据和 TOS 文件数据都删除后才能保证数据库表继续正常写入。

8.7 LAS 中数据表是非分区表,TOS 文件系统上却有分区

如果我们使用了分区表,但是发现 LAS 和 TOS 的表信息不同:

  1. LAS 界面展示表为分区表

Image

  1. TOS 界面展示数据表已经产生分区

Image
解决方案:Paimon 不会自动将表的分区信息同步到 LAS 元数据管理。如果需要在 LAS 元数据管理看到数据表的分区字段,需要在建表语句中增加如下 WITH 参数:

'metastore.partitioned-table' = 'true' -- 确定是否将分区信息同步到 LAS 元数据管理

需要注意的是:因为分区字段无法动态增加,增加参数后,需要将原有的数据表清掉(包括 LAS 元数据和 TOS 的数据文件),然后重新创建。