You need to enable JavaScript to run this app.
导航
Paimon
最近更新时间:2025.02.05 23:36:01首次发布时间:2025.01.14 17:41:38

Apache Paimon 是一个基于流和批处理的实时数据湖解决方案,结合了高效的存储和灵活的计算,专为处理大规模实时数据和流式数据而设计。Paimon 通过集成 Log-Structured Merge Tree (LSM Tree) 结构,实现高效的实时更新和查询能力,支持流批一体化的数据处理模式。其优势在于低延迟、强一致性和易扩展性。

功能特色

Paimon 具有以下功能特色,更多内容可以参考社区官方文档

统一存储

Paimon 提供表抽象,支持批处理和流处理模式下的数据读写操作。

  • 在批处理模式下,Paimon 类似于 Hive 表,支持各种批处理 SQL 操作,可查询最新快照。
  • 在流处理模式下,Paimon 类似于消息队列,允许查询类似于消息队列中变更日志的流数据,且历史数据不会过期。

丰富生态支持

多引擎生态系统支持:除了与 Apache Flink 的深度集成,Paimon 还支持其他计算引擎,如 Apache Hive、Apache Spark、 Trino、 StarRocks 等,提供灵活的数据读写能力。

高效存储与查询

  • Paimon 在文件系统或对象存储上以列式文件格式存储数据,元数据保存在清单文件中,支持大规模存储和数据跳过功能,提高查询性能。
  • 对于主键表,Paimon 采用 LSM 树结构,支持大规模数据更新和高性能查询。

灵活的数据读写模式

  • 读取方式:支持从历史快照(批处理模式)、最新偏移量(流处理模式)读取数据,或以混合方式读取增量快照。
  • 写入方式:支持从数据库变更日志(CDC)的流式同步,以及离线数据的批量插入或覆盖。

完善的湖管理能力

  • Paimon 的数据存储构建在对象存储 TOS 之上,基于这些低成本、高可用的分布式远端存储和可扩展的元数据管理设计,Paimon 提供了完整的数据湖能力:包括 ACID、 Schema Evolution、版本回溯、Tag、Branch 等。

功能详解

数据表类型

主键表(Primary Key Table):
主键表为 Paimon 默认表类型,可插入、更新、删除记录。主键由列组成,记录主键值须唯一。Paimon 在存储桶内排序主键确保数据有序,过滤查询主键时性能高。主键表用 LSM 树结构存储数据,支持高效读写。它还支持分区和存储桶机制提升数据管理与查询效率。

非主键表(Non Primary Key Table):

  • 追加表(Append Table):

若表未定义主键,默认为追加表。流式处理时,追加表只支持插入完整记录,适用于日志数据同步等无需流式更新的用例。默认无存储桶概念,类似 Hive 表,数据文件按分区组织排序以加速查询。流式写入作业中,追加表有自动小文件合并机制,由 Compact Coordinator 和 Compact Worker 组件扫描合并小文件,提高存储效率与查询性能。

  • 追加队列(Append Queue):

追加队列表为按存储桶划分的队列。同一存储桶内记录按写入顺序排列,流式读取按此顺序传递给下游,适用于需保证数据顺序场景如日志处理、事件溯源。默认所有数据入一个存储桶成一个队列,用户可通过定义参数增加并行度、分散负载。Append Queue 支持自动文件压缩以控制文件数量、提升存储和查询效率,流式读取时 Paimon 会按分区和存储桶顺序确保数据按预期顺序传递。

Merge-Engine

Paimon 提供多种合并引擎,支持不同的合并机制,适用于不同的场景。以下是主要的合并引擎机制:
Image
对这几种合并引擎的详细介绍如下:

合并引擎类型

描述

适用场景

去重(Deduplicate)

默认的合并引擎。Paimon 仅保留具有相同主键的最新记录,丢弃其他记录。特别地,如果最新记录是 DELETE 记录,则所有具有相同主键的记录都将被删除。可以通过配置 ignore-delete 来忽略删除操作。

适用于需要确保主键唯一性,并且只关心最新数据的场景。

部分更新(Partial Update)

允许通过多次更新逐步完善记录的各个列,即根据相同主键,逐个更新字段的最新值,但不会覆盖非空值。默认情况下,部分更新引擎不接受删除记录,可以通过配置 ignore-delete 来忽略删除记录,或配置 sequence-group 来撤销部分列。

适用于需要逐步更新记录的部分字段,而不影响其他字段的场景。适合主键多流 Join 的场景。

预聚合(Aggregation)

用户只关注聚合结果。根据指定的聚合函数,对具有相同主键的记录进行字段级别的聚合。每个非主键字段可以指定聚合函数,未指定的字段默认使用 last_non_null_value 聚合。

适用于需要对数据进行聚合计算的场景,例如求和、计数等。

首行(First Row)

保留具有相同主键的第一条记录,与去重引擎不同,首行引擎保留最早的记录。这种模式下产出变更日志是 INSERT 流。

适用于需要保留首次出现的记录,而忽略后续更新的场景。

Changelog 产出机制

changelog-producer 参数用于控制表在写入数据时生成变更日志(changelog)的方式。不同的模式适用于不同的应用场景,以下是主要的 changelog-producer 模式及其作用和适用场景:

模式名称

作用

适用场景

无(None)

默认模式,不生成额外的变更日志。Paimon 源只能看到跨快照合并后的更改,无法形成完整的变更日志。

适用于不需要完整变更日志的消费者,如数据库系统。需要注意的是,Flink 内置的 “Normalize” 操作符会在状态中持久化每个键的值,但这种操作代价高昂,应尽量避免。

输入(Input)

将输入视为完整的变更日志源,所有输入记录将被保存到独立的变更日志文件中,并通过 Paimon 源提供给消费者。

适用于输入本身就是完整的变更日志的情况,例如来自数据库的 CDC(变更数据捕获)数据,或由 Flink 有状态计算生成的数据。

查找(Lookup)

在提交数据写入之前,通过查找生成变更日志。Paimon 会在内存和本地磁盘上缓存数据,以生成完整的变更日志。

适用于输入无法生成完整变更日志,但希望避免昂贵的 “Normalize” 操作的情况。需要注意的是,Lookup 操作会消耗一定的资源,比较适用于实时性要求较高场景(分钟级)

完全压缩(Full Compaction)

通过比较完全压缩之间的结果,生成差异作为变更日志。变更日志的延迟受到完全压缩频率的影响。

适用于希望解耦数据写入和变更日志生成的高延迟场景,例如每隔一段时间(如小时级)生成一次变更日志。此模式可以减少资源消耗,但会增加变更日志的延迟。

限制条件
  • Paimon 连接器暂时仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。
  • 如果以对象存储 TOS 作为存储目的地,需要提前建好存储 Bucket,可以参考文档。另外,为了下游批处理性能考虑,可以使用 TOS 分层桶(HNS)
  • 如果您需要使用统一元数据管理服务,可以参考 LAS Catalog 作为统一的湖元数据管理工具,需要提前开通 LAS Catalog 服务。

Paimon 版本

Flink 引擎版本

功能说明

0.6

Flink-1.16-volcano

没有内置 LAS Catalog 支持,需要手动上传 Connector。

0.8.2

Flink-1.17-volcano

  1. 内置 LAS Catalog
  2. 支持 Procedure 语法

SQL 语法详解

CREATE Catalog

火山引擎 Paimon 支持多种元存储类型,以下是文件系统 Catalog 和 LAS Catalog 的创建方法:

Filesystem Catalog

在此模式下,元数据和表文件都存储在文件系统中。

CREATE CATALOG my_catalog WITH (
  'type' = 'paimon',
  'warehouse' = 'tos://<bucket-name>/path/to/warehouse'
);

上述 SQL 语句创建了一个名为 my_catalog 的 Paimon Catalog,数据仓库路径为 tos://<bucket-name>/path/to/warehouse

LAS Catalog

此模式下,元数据存储在 LAS Catalog 的元数据存储服务中,表文件存储在指定的文件系统路径下。

CREATE CATALOG my_hive WITH (
  'type' = 'paimon',
  'metastore' = 'hive',
  -- LAS 元数据服务的 Thrift 接口地址,注意修改其中具体 Region 信息
  'uri' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
  -- Hive Conf 目录,由 Flink 作业开发界面依赖文件模块上传
  'hive-conf-dir' = '/opt/tiger/workdir', 
  -- 存储的桶目录地址
  'warehouse' = 'tos://<bucket-name>/path/to/warehouse'
);

上述 SQL 语句创建了一个名为 my_hive 的 Paimon Catalog,元数据存储在 LAS Catalog 元数据中,数据仓库路径为 tos://<bucket-name>/path/to/warehouse
另外在创建作业的时候需要在作业开发界面上传相应的 hive.xml 文件:
Image
其中 hive.xml 文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.server2.max.start.attempts</name>
    <value>5</value>
  </property>
  <property>
    <name>hive.client.las.region.name</name>
    <value>cn-shanghai</value>
  </property>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://lakeformation.las.cn-shanghai.ivolces.com:48869</value>
  </property>
  <property>
    <name>hive.hms.client.is.public.cloud</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.client.las.ak</name>
    <value>__LAS_ACCESS_KEY__</value>
  </property>
  <property>
    <name>hive.client.las.sk</name>
    <value>__LAS_ACCESS_KEY_SECRET__</value>
  </property>
</configuration>

其中注意要填写具体的 LAS Catalog 和账号 AK/SK 信息。然后将 hive.xml 文件通过作业开发-依赖文件。
另外,如果是使用 Flink-1.16-volcano 的话,引擎没有内置 LAS Catalog 的连接器,需要手动上传以下 JAR 包到依赖文件中:

flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT.jar
未知大小

CREATE TABLE

在指定的 Catalog 中,可以使用 SQL 语句创建数据表。以下是创建一个包含主键和分区的表的示例:

-- 创建 Catalog
CREATE CATALOG `paimon_test` WITH (
  'type' = 'paimon',
  'warehouse' = 'tos://<bucket-name>/path/to/warehouse'
);

-- 创建 Database
CREATE DATABASE IF NOT EXISTS `paimon_test`.`test_db`;

-- 创建数据表
CREATE TABLE IF NOT EXISTS `paimon_test`.`test_db`.`doc_result` (
    word varchar,
    cnt bigint,
    PRIMARY KEY (word) NOT ENFORCED
) WITH (
    'bucket' = '4',  -- 控制分桶数量,单个 bucket 推荐存储 2GB 左右数据
    'changelog-producer' = 'input' -- 产生 changelog,用于下游流读
);

通过上述 SQL 语法,用户可以在 Paimon 中创建不同类型的 Catalog 和数据表,以满足不同的业务需求。

SQL INSERT

Paimon 支持通过 Flink SQL 对表进行数据写入操作,主要包括以下几种方式:

  1. 插入数据(INSERT INTO):

使用 INSERT INTO 语句,可以将数据插入到指定的表中。此操作适用于批处理和流式作业。

INSERT INTO my_table
SELECT ...

在流式模式下,默认情况下,Paimon 会在 Flink Sink 中执行数据压缩、快照过期,甚至分区过期(如果已配置)。

  1. 覆写整个表(INSERT OVERWRITE):

对于未分区的表,可以使用 INSERT OVERWRITE 语句来覆盖整个表的数据。

INSERT OVERWRITE my_table
SELECT ...
  1. 覆写分区数据:

对于分区表,可以使用 INSERT OVERWRITE 语句来覆盖特定分区的数据。

INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...)
SELECT ...
  1. 动态分区覆写:

Flink 默认采用动态分区覆写模式,即仅删除被覆写数据中出现的分区。可以通过配置 dynamic-partition-overwrite 参数来切换到静态覆写模式。

-- 动态覆写
INSERT OVERWRITE my_table
SELECT ...
-- 静态覆写(覆写整个表)
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */
SELECT ...
  1. 清空表数据:

可以使用 INSERT OVERWRITE 语句插入空值来清空表数据。

INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
SELECT * FROM my_table WHERE false;
  1. 删除分区数据:

目前,Paimon 支持两种方式来删除分区数据:

  1. 使用 INSERT OVERWRITE 语句向特定分区插入空值,以清空该分区的数据。
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
PARTITION (key1 = value1, key2 = value2, ...)
SELECT selectSpec FROM my_table WHERE false;
  1. 对于需要删除多个分区的情况,可以通过提交 drop_partition 作业来实现。

SQL QUERY

Paimon 支持通过 Flink SQL 对表进行查询操作,主要包括以下方式:
1. 批查询(Batch Query):
在批处理模式下,Paimon 返回表的特定快照数据,默认返回最新快照。

-- 在作业模式中设置为批作业
-- 查询最新快照,其中 print_sink 是 Connector 为 print 的 Sink 表
INSERT INTO print_sink
SELECT * FROM my_table;

注意:因为在流式计算 Flink 产品中,暂不支持直接使用 SELECT 语句进行查询调试,所以如果需要在作业开发界面调试 Query 语句,都必须要增加 INSERT INTO print_sinkprint_sink 建表语句进行数据打印。
另外,为了下文行文方便,统一省略 INSERT INTO 相关调试语句。请在实际应用中结合实际情况进行语句调整。

2. 时间旅行查询(Time Travel Query):
Paimon 支持基于时间或快照 ID 的时间旅行查询,允许用户读取表在特定时间点或快照的状态。

-- 通过快照 ID 查询
SELECT * FROM my_table /*+ OPTIONS('scan.snapshot-id' = '1') */;
-- 通过时间戳查询(Unix 毫秒时间)
SELECT * FROM my_table /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
-- 通过标签查询
SELECT * FROM my_table /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
-- 通过水位线时间戳查询
SELECT * FROM my_table /*+ OPTIONS('scan.watermark' = '1678883047356') */;

3. 增量批查询(Batch Incremental Query):
Paimon 支持增量批查询,允许用户读取表在特定快照范围内的变更数据。

-- 查询快照 ID 范围内的增量数据
SELECT * FROM my_table /*+ OPTIONS('incremental-between' = '12,20') */;

-- 查询时间戳范围内的增量数据
SELECT * FROM my_table /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;

对于需要查看删除记录的情况,可以查询 audit_log 系统表:

SELECT * FROM my_table$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;

4. 流式查询(Streaming Query):
在流处理模式下,Paimon 支持实时读取表的最新变更。默认情况下,流式查询在启动时会读取表的最新快照,并持续读取后续的变更。

-- Flink SQL 作业设置为流式模式
-- 流式查询表数据
SELECT * FROM my_table;

此外,Paimon 还支持在流式查询中指定消费起始位置,例如从特定快照或时间点开始消费:

-- 从指定快照 ID 开始消费
SELECT * FROM my_table /*+ OPTIONS('scan.startup-mode' = 'from-snapshot', 'scan.snapshot-id' = '1') */;

-- 从指定时间戳开始消费
SELECT * FROM my_table /*+ OPTIONS('scan.startup-mode' = 'from-timestamp', 'scan.timestamp-millis' = '1678883047356') */;

通过上述功能,Paimon 为 Flink SQL 查询提供了灵活的支持,满足不同场景下的数据查询需求。

Lookup Join

Paimon 支持在 Flink 中使用 Lookup Join 功能,以丰富流式查询的数据。可以参考如下的流程使用 Paimon 的维表功能:
1. 准备工作:
首先,需要创建一个 Paimon 表,并实时更新该表的数据。

-- 创建 Paimon Catalog
CREATE CATALOG my_catalog WITH (
  'type'='paimon',
  'warehouse'='tos://<bucket-name>/path/to/warehouse' 
);

USE CATALOG my_catalog;

-- 在 Paimon Catalog 中创建表
CREATE TABLE customers (
    id INT PRIMARY KEY NOT ENFORCED,
    name STRING,
    country STRING,
    zip STRING
);

-- 启动流式作业以更新 customers 表
INSERT INTO customers ...

-- 创建临时左表,例如来自 Kafka 的数据
CREATE TEMPORARY TABLE orders (
    order_id INT,
    total INT,
    customer_id INT,
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = '...',
    'properties.bootstrap.servers' = '...',
    'format' = 'csv'
    ...
);

2. 普通 Lookup Join:
在准备好上述表后,可以在查询中使用 Lookup Join,将实时订单数据与客户信息关联。

-- 使用客户信息丰富每个订单
SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

在上述查询中,orders 表是流式数据源,customers 表是维度表,通过 FOR SYSTEM_TIME AS OF 子句实现基于处理时间的关联。
3. 加重试机制的 Lookup Join:
如果在进行 Lookup Join 时,orders 表中的记录由于 customers 表中尚未准备好的数据而导致关联失败,可以考虑使用 Flink 的延迟重试策略(仅适用于 Flink-1.16-volcano 及以上版本)。

-- 使用客户信息丰富每个订单,并在关联失败时进行重试
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

在上述查询中,使用了 Flink 的 Lookup Hint 来指定重试策略,例如固定延迟 1 秒,最多重试 600 次,以应对维度表数据暂时不可用的情况。

Procedure

存储过程(Procedure)语法是为了实现 Compaction、创建分支、Tag、回滚等高级湖管理功能的语法。在 Flink-1.17-volcano 及以上版本支持。

注意:当前语法仅支持 Flink-1.17-volcano 及以上版本。

命令

用法

解释

示例

compact

CALL [catalog.]sys.compact('table')
CALL [catalog.]sys.compact('table', 'partitions')
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by')
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options')
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where')
CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 'order_by', 'options', 'where', 'partition_idle_time')

Compact 数据表。参数如下:

  • table(必填):目标表标识符。
  • partitions(可选):分区筛选器。
  • order_strategy(可选):“顺序(order)”“Z 序(zorder)”“希尔伯特(hilbert)”或“无(none)”。
  • order_by(可选):需要排序的列。若“order_strategy”为“none”,则此项留空。
  • options(可选):表的其他动态附加选项。
  • where(可选):分区谓词(不能与“partitions”一起使用)。注意:由于“where”是一个关键字,所以需要像“where”这样加上一对反引号。
  • partition_idle_time(可选):用于对在“partition_idle_time”时长内未接收任何新数据的分区进行完全压缩。并且只有这些分区会被压缩。该参数不能与顺序压缩一起使用。

-- 使用分区选择功能
CALL sys.compact('default.T', 'p=0', 'zorder', 'a,b', 'sink.parallelism=4')
-- 使用分区筛选条件
CALL sys.compact('default.T', 'dt>10 and h<20', 'zorder', 'a,b', 'sink.parallelism=4')

compact_database

CALL [catalog.]sys.compact_database()
CALL [catalog.]sys.compact_database('includingDatabases')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')

Compact 数据库。参数如下:

  • includingDatabases(包含的数据库):用于指定数据库。可使用正则表达式。
  • mode(模式):压缩模式。“divided”(分拆式):为每个表启动一个接收端,检测到新表时需要重启作业;“combined”(合并式,默认模式):针对所有表启动一个合并的接收端,新表将会被自动检测到。
  • includingTables(包含的表):用于指定表。可使用正则表达式。
  • excludingTables(排除的表):用于指定不进行压缩的表。可使用正则表达式。
  • tableOptions(表选项):表的额外动态选项。
  • partition_idle_time(分区闲置时间):该参数用于对在“partition_idle_time”时长内未接收到任何新数据的分区进行完全压缩。并且只有这些分区会被压缩。

CALL sys.compact_database('db1

create_tag

-- 基于指定快照
CALL [catalog.]sys.create_tag('identifier', 'tagName', snapshotId)

-- 基于最新快照
CALL [catalog.]sys.create_tag('identifier', 'tagName')

基于给定的快照创建标签。参数如下:

  • identifier(标识符):目标表的标识符,不能为空。
  • tagName(标签名):新建标签的名称。
  • snapshotId(长整型):新建标签所基于的快照的 ID(标识号)。
  • time_retained(保留时间):为新创建标签保留的最长时间。

CALL sys.create_tag('default.T', 'my_tag', 10, '1 d')

delete_tag

CALL [catalog.]sys.delete_tag('identifier', 'tagName')

删除标签。参数如下:

  • identifier(标识符):目标表的标识符,不能为空。
  • tagName(标签名):要删除的标签的名称。如果指定多个标签,分隔符为“,”(逗号)。

CALL sys.delete_tag('default.T', 'my_tag')

merge_into

CALL [catalog.]sys.merge_into('identifier','targetAlias',
'sourceSqls','sourceTable','mergeCondition',
'matchedUpsertCondition','matchedUpsertSetting',
'notMatchedInsertCondition','notMatchedInsertValues',
'matchedDeleteCondition')

执行 "MERGE INTO" 语法。 参考 Paimon 社区 merge_into action 文档查看具体用法

-- 对于匹配的订单行,提高价格,并且如果没有匹配项,则插入来自源表的订单。
CALL sys.merge_into('default.T', 'default.S', 'T.id=S.order_id', 'price=T.price+20', '*')

remove_orphan_files

CALL [catalog.]sys.remove_orphan_files('identifier')
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')

删除孤立的数据文件和元数据文件。参数如下:

  • identifier(标识符):目标表的标识符,不能为空,你可以使用 db_name.*的形式来清理整个数据库。
  • olderThan(早于):为避免删除新写入的文件,默认情况下,该操作仅删除早于 1 天的孤立文件。此参数可用于修改时间间隔。
  • dryRun(试运行):当该项设置为“true”(真)时,仅查看孤立文件,而不实际删除文件。默认值为“false”(假)。

CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')
CALL remove_orphan_files('default.*', '2023-10-31 12:00:00')
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)

reset_consumer

-- 重置消费者的到下一个快照
CALL [catalog.]sys.reset_consumer('identifier', 'consumerId', nextSnapshotId)
-- 删除消费者
CALL [catalog.]sys.reset_consumer('identifier', 'consumerId')

重置或删除消费者。参数如下:

  • identifier(标识符):目标表的标识符,不能为空。
  • consumerId(消费者 ID):要被重置或删除的消费者。
  • nextSnapshotId(长整型):消费者新的下一个快照 ID。

CALL sys.reset_consumer('default.T', 'myid', 10)

rollback_to

-- 回滚到指定 snapshot
CALL sys.rollback_to(table => 'identifier', snapshot_id => snapshotId)
-- 回滚到指定 tag
CALL sys.rollback_to(table => 'identifier', tag => 'tagName')

回滚到目标表的特定版本。参数:

identifier(标识符):目标表的标识符,不能为空。
snapshotId(长整型):将要回滚到的快照的 ID。
tagName(标签名):将要回滚到的标签名称。

CALL sys.rollback_to(table => 'default.T', snapshot_id => 10)

expire_snapshots

-- 社区 Flink 1.18 版本风格
CALL sys.expire_snapshots(table, retain_max)

使快照过期。参数:

table(表):目标表的标识符,不能为空。
retain_max(最大保留数):要保留的已完成快照的最大数量。
retain_min(最小保留数):要保留的已完成快照的最小数量。
order_than(早于…… 的时间戳):在此时间戳之前的快照将会被移除。
max_deletes(最大删除数量):一次能够删除的快照的最大数量。

CALL sys.expire_snapshots('default.T', 2)

WITH 参数详解

参数

默认值

类型

解释

auto-create

false

Boolean

在读写表时是否创建底层文件/文件夹。

bucket

-1

Integer

文件存储的桶数量。
它应当等于 -1(动态桶模式),或者必须大于 0(固定桶模式)。

bucket-key

(none)

String

指定 Paimon 数据分布策略。数据根据桶键(bucket-key)的哈希值分配到每个桶中。
如果指定多个字段,分隔符为“,”。
如果未指定,则将使用主键;如果没有主键,则将使用整行数据。

cache-page-size

64 kb

MemorySize

用于缓存的内存页 Page 大小

changelog-producer

none

Enum

是否双写到变更日志文件中。该变更日志文件记录了数据变更的详细信息,可以在流式读取时直接使用。此功能适用于具有主键的表。
可能的值:

  • "none":不生成变更日志文件。
  • "input":在刷新内存表时双写到变更日志文件,变更日志来源于输入数据。
  • "full-compaction":每次完整压缩时生成变更日志文件。
  • "lookup":在提交数据写入前通过“查找”生成变更日志文件。

changelog-producer.row-deduplicate

false

Boolean

是否为相同记录生成 -U+U 类型的变更日志。此配置仅在 changelog-producerlookupfull-compaction 时有效。

changelog.num-retained.max

(none)

Integer

保留的已完成变更日志的最大数量。应大于或等于 changelog.num-retained.min

changelog.num-retained.min

(none)

Integer

保留的已完成变更日志的最小数量。应大于或等于 1。

changelog.time-retained

(none)

Duration

已完成变更日志的最大保留时间

commit.callback.#.param

(none)

String

用于类构造函数的参数字符串,回调类应自行解析该参数

commit.callbacks

(none)

String

提交成功后调用的一组提交回调类。类名通过逗号连接(例如:com.test.CallbackA,com.sample.CallbackB)。

commit.force-compact

false

Boolean

是否在提交前强制进行 compaction。

commit.force-create-snapshot

false

Boolean

是否在提交前强制创建快照。

compaction.max-size-amplification-percent

200

Integer

对于变更日志模式表,合并树中存储单个字节数据所需的额外存储量(以百分比表示)。

compaction.max.file-num

50

Integer

对于文件集 [f_0,...,f_N],即使总和(size(f_i))小于目标文件大小,触发仅追加表的 compaction 的最大文件数量。此值可避免挂起过多小文件,从而降低性能。

compaction.min.file-num

5

Integer

对于文件集 [f_0,...,f_N],对于仅追加表,满足总和(size(f_i))>= 目标文件大小以触发 compaction 的最小文件数量。此值避免几乎满的文件被 compaction,这是不划算的。

compaction.optimization-interval

(none)

Duration

执行优化 compaction 的频率,此配置用于确保读优化系统表的查询及时性。

compaction.size-ratio

1

Integer

对于变更日志模式表,在比较排序运行大小时的百分比灵活性。如果候选排序运行(s)的大小比下一个排序运行的大小小 1%,则将下一个排序运行包含在此候选集中。

consumer-id

(none)

String

用于记录存储中消费偏移量的消费者 ID。

consumer.expiration-time

(none)

Duration

消费者文件的过期间隔。如果自上次修改后的生存期超过此值,消费者文件将过期。

consumer.ignore-progress

false

Boolean

对于新启动的作业是否忽略消费者进度。

consumer.mode

exactly-once

Enum

指定表的消费者一致性模式。
可能的值:

  • "exactly-once":读者以快照粒度消费数据,并严格确保消费者中记录的快照 ID 是所有读者已确切消费的快照 ID + 1
  • "at-least-once":每个读者以不同的速率消费快照,并且所有读者中消费进度最慢的快照将记录在消费者中。

continuous.discovery-interval

10 s

Duration

持续读取文件的发现间隔时间,默认 10 秒会检测是否有文件变化。

cross-partition-upsert.bootstrap-parallelism

10

Integer

跨分区更新插入的单个任务中引导的并行度。

cross-partition-upsert.index-ttl

(none)

Duration

跨分区更新插入(主键不包含所有分区字段)在 rocksdb 索引中的 TTL,这可以避免维护过多索引并导致性能越来越差,但请注意,这也可能导致数据重复。

delete.force-produce-changelog

false

Boolean

在删除 SQL 中强制生成变更日志,或者您可以使用“streaming-read-overwrite”从覆盖提交中读取变更日志。

deletion-vectors.enabled

false

Boolean

是否启用删除向量模式。在此模式下,写入数据时会生成包含删除向量的索引文件,标记要删除的数据。在读取操作期间,通过应用这些索引文件,可以避免合并。

dynamic-bucket.assigner-parallelism

(none)

Integer

动态桶模式下分配器操作的并行度,它与初始化的桶数量有关,过小会导致分配器处理速度不足。

dynamic-bucket.initial-buckets

(none)

Integer

动态桶模式下分配器操作中一个分区的初始桶数。

dynamic-bucket.target-row-num

2000000

Long

如果桶为 -1,对于主键表,为动态桶模式,此选项控制一个桶的目标行数。

dynamic-partition-overwrite

true

Boolean

在覆盖具有动态分区列的分区表时是否仅覆盖动态分区。仅在表具有分区键时有效。

file-index.in-manifest-threshold

500 bytes

MemorySize

在清单中存储文件索引字节的阈值。

file-index.read.enabled

true

Boolean

是否启用读取文件索引。

file-reader-async-threshold

10 mb

MemorySize

读取文件异步的阈值。

file.compression

(none)

String

默认文件压缩格式

  • orc 为 lz4
  • parquet 为 snappy。它可以被 file.compression.per.level 覆盖

file.compression.per.level

Map

为不同级别定义不同的压缩策略,您可以这样添加配置:'file.compression.per.level' = '0:lz4,1:zstd'。

file.format

orc

Enum

指定数据文件的消息格式,当前支持 orc、parquet 和 avro 格式。
可能的值:

  • “orc”:ORC 文件格式。
  • “parquet”:Parquet 文件格式。
  • “avro”:Avro 文件格式。

file.format.per.level

Map

为不同级别定义不同的文件格式,您可以这样添加配置:'file.format.per.level' = '0:avro,3:parquet',如果未为级别提供文件格式,则将使用 file.format 设置的默认格式。

full-compaction.delta-commits

(none)

Integer

在增量提交多少次之后会定时的进行 Full Compaction。默认不会有这个行为

ignore-delete

false

Boolean

是否需要忽略删除记录行为,默认是不忽略。

incremental-between

(none)

String

在起始快照(排他)和结束快照之间读取增量更改,例如,'5,10' 表示快照 5 和快照 10 之间的更改。

incremental-between-scan-mode

auto

Enum

在起始快照(排他)和结束快照之间读取增量更改时的扫描类型。
可能的值:

  • "auto":扫描为表生成变更日志文件的变更日志文件。否则,扫描新更改的文件。
  • "delta":扫描快照之间新更改的文件。
  • "changelog":扫描快照之间的变更日志文件。

incremental-between-timestamp

(none)

String

在起始时间戳(排他)和结束时间戳之间读取增量更改,例如,'t1,t2' 表示时间戳 t1 和时间戳 t2 之间的更改。

local-merge-buffer-size

(none)

MemorySize

本地合并将在输入记录按桶(bucket)Shuffle 并写入接收器(sink)之前对其进行缓冲和合并。当缓冲区满时将被刷新。主要用于解决主键上的数据倾斜问题。我们建议在试用此功能时从 64 兆字节(64mb)开始。

local-sort.max-num-file-handles

128

Integer

外部合并排序的最大扇入。它限制了文件句柄的数量。如果太小,可能导致中间合并。但如果太大,会导致同时打开太多文件,消耗内存并导致随机读取。

lookup.cache-file-retention

1 h

Duration

查找的缓存文件保留时间。文件过期后,如果需要访问,将从 DFS 重新读取以在本地磁盘上构建索引。

lookup.cache-max-disk-size

infinite

MemorySize

Lookup 缓存的最大磁盘大小,您可以使用此选项限制本地磁盘的使用。

lookup.cache-max-memory-size

256 mb

MemorySize

Lookup 缓存的最大内存大小。

lookup.cache-spill-compression

"lz4"

String

Lookup 缓存的 spill 文件压缩,当前支持nonelz4lzozstd

lookup.cache.bloom.filter.enabled

true

Boolean

是否为 Lookup 缓存启用布隆过滤器。

lookup.cache.bloom.filter.fpp

0.05

Double

为查找缓存布隆过滤器定义默认的误报概率。

lookup.hash-load-factor

0.75

Float

查找的索引加载因子。

manifest.format

avro

Enum

指定 manifest 文件的消息格式。
可能的值:

  • "orc":ORC 文件格式。
  • "parquet":Parquet 文件格式。
  • "avro":Avro 文件格式。

manifest.full-compaction-threshold-size

16 mb

MemorySize

触发清单 full compaction 的大小阈值。

manifest.merge-min-count

30

Integer

为避免频繁的 manifest 文件合并,此参数指定要合并的 ManifestFileMeta 的最小数量。

manifest.target-file-size

8 mb

MemorySize

建议的 manifest 文件大小。

merge-engine

deduplicate

Enum

为具有主键的表指定合并引擎。
可能的值:

  • 去重(deduplicate):去重并保留最后一行。
  • 部分列更新(partial-update):部分更新非空字段。
  • 预聚合(aggregation):预聚合具有相同主键的字段。
  • 首行(first-row):去重并保留第一行。

metadata.stats-mode

"truncate(16)"

String

元数据统计信息收集的模式。可用的模式有 nonecountstruncate(16)full。

  • "none":表示禁用元数据统计信息收集。
  • "counts" 表示仅收集空值计数。
  • "full":表示收集空值计数、最小值/最大值。
  • "truncate(16)":表示收集空值计数、截断长度为 16 的最小值/最大值。
  • 可以通过 fields.{field_name}.stats-mode 指定字段级别的统计模式

metastore.partitioned-table

false

Boolean

是否在元存储中将此表创建为分区表。例如,如果您想在 Hive 中列出 Paimon 表的所有分区,则需要在 Hive 元存储中将此表创建为分区表。此配置选项不影响默认的文件系统元存储。

metastore.tag-to-partition

(none)

String

是否在元存储中为此表创建用于映射非分区表标签的分区表。这允许 Hive 引擎以分区表视图查看此表,并使用分区字段读取特定分区(特定标签)。

metastore.tag-to-partition.preview

none

Enum

是否在元存储中预览生成的快照的标签。这允许 Hive 引擎在创建之前查询特定标签。
可能的值:

  • "none":无自动创建的标签。
  • "process-time":基于机器的时间,处理时间超过周期时间加上延迟时创建标签。
  • "watermark":基于输入的水印,水印超过周期时间加上延迟时创建标签。
  • "batch":在批处理场景中,任务完成后生成当前快照对应的标签。

num-levels

(none)

Integer

总级别数,例如,有 3 个级别,包括 0、1、2 级别。

num-sorted-run.compaction-trigger

5

Integer

触发压缩操作的排序运行数量。包括第 0 层文件(每个文件为一个排序运行 )和高层级文件(每个层级为一个排序运行)。

num-sorted-run.stop-trigger

(none)

Integer

触发停止写入的排序运行数量,默认值为 'num-sorted-run.compaction-trigger' + 3。

page-size

64 kb

MemorySize

内存页大小。

parquet.enable.dictionary

(none)

Integer

为 parquet 中的所有字段关闭字典编码。

partition

(none)

String

通过表选项定义分区,不能同时在 DDL 和表选项上定义分区。

partition.default-name

"DEFAULT_PARTITION"

String

在动态分区列值为 null/空字符串的情况下的默认分区名称。

partition.expiration-check-interval

1 h

Duration

分区过期的检查间隔。

partition.expiration-time

(none)

Duration

分区的过期间隔。如果分区的生存期超过此值,分区将过期。分区时间从分区值中提取。

partition.timestamp-formatter

(none)

String

从字符串格式化时间戳的格式化程序。它可以与 'partition.timestamp-pattern' 一起使用,使用指定的值创建格式化程序。

  • 默认格式化程序是 'yyyy-MM-dd HH:mm:ss' 和 'yyyy-MM-dd'。
  • 支持多个分区字段,如 '$year-$month-$day $hour:00:00'。
  • 时间戳格式化程序与 Java 的 DateTimeFormatter 兼容。

partition.timestamp-pattern

(none)

String

您可以指定一个模式从分区中获取时间戳。格式化程序模式由 'partition.timestamp-formatter' 定义。

  • 默认情况下,从第一个字段读取。
  • 如果分区中的时间戳是一个名为 'dt' 的单个字段,您可以使用 '$dt'。
  • 如果它分布在年、月、日和小时的多个字段中,您可以使用 '$year-$month-$day $hour:00:00'。
  • 如果时间戳在字段 dt 和小时中,您可以使用 '$dt $hour:00:00'。

primary-key

(none)

String

通过表选项定义主键,不能同时在 DDL 和表选项上定义主键。

read.batch-size

1024

Integer

orc 和 parquet 的读取批处理大小。

record-level.expire-time

(none)

Duration

主键表的记录级别过期时间,在压缩时过期,无法保证及时过期记录。您还必须指定 'record-level.time-field'。

record-level.time-field

(none)

String

记录级别过期的时间字段,应为秒 INT 类型。

rowkind.field

(none)

String

为主键表生成行类型的字段,行类型决定哪些数据是 '+I'、'-U'、'+U' 或 '-D'。

scan.bounded.watermark

(none)

Long

有界流模式的结束条件“水印”。当遇到较大的水印快照时,流读取将结束。

scan.file-creation-time-millis

(none)

Long

配置此时间后,仅读取此时间之后创建的数据文件。它独立于快照,但它是不精确的过滤(取决于是否发生压缩)。

scan.manifest.parallelism

(none)

Integer

扫描清单文件的并行度,默认值是 CPU 处理器的大小。注意:扩展

scan.max-splits-per-task

10

Integer

扫描时一个任务应缓存的最大分割大小。如果枚举器中缓存的分割大小大于任务大小乘以此值,扫描器将暂停扫描。

scan.mode

default

Enum

指定源的扫描行为。
可能的值:

  • "default":根据其他表属性确定实际的启动模式。如果设置了"scan.timestamp-millis",实际启动模式将为"from-timestamp",如果设置了"scan.snapshot-id"或"scan.tag-name",实际启动模式将为"from-snapshot"。否则,实际启动模式将为"latest-full"。
  • "latest-full":对于流数据源,首次启动时生成表上的最新快照,并继续读取最新更改。对于批处理源,仅生成最新快照,但不读取新更改。
  • "full":已弃用。与"latest-full"相同。
  • "latest":对于流数据源,在开始时不生成快照,持续读取最新更改。对于批处理源,行为与"latest-full"启动模式相同。
  • "compacted-full":对于流数据源,首次启动时在表的最新 compact 后生成快照,并继续读取最新更改。对于批处理源,仅在最新 compact 后生成快照,但不读取新更改。当启用计划的 full compact 时,选择 full compact 后的快照。
  • "from-timestamp":对于流数据源,从"scan.timestamp-millis"指定的时间戳开始持续读取更改,在开始时不生成快照。对于批处理源,在"scan.timestamp-millis"指定的时间戳生成快照,但不读取新更改。
  • "from-file-creation-time":对于流和批处理源,生成快照并按创建时间过滤数据文件。对于流数据源,首次启动时,并继续读取最新更改。
  • "from-snapshot":对于流数据源,从"scan.snapshot-id"指定的快照开始持续读取更改,在开始时不生成快照。对于批处理源,生成由"scan.snapshot-id"或"scan.tag-name"指定的快照,但不读取新更改。
  • "from-snapshot-full":对于流数据源,首次启动时从"scan.snapshot-id"指定的表上的快照生成,并持续读取更改。对于批处理源,生成由"scan.snapshot-id"指定的快照,但不读取新更改。
  • "incremental":读取起始和结束快照或时间戳之间的增量更改。

scan.plan-sort-partition

false

Boolean

是否按分区字段对计划文件进行排序,这允许您按照分区顺序读取,即使您的分区写入是无序的。
建议您将此用于'append-only'表的流读取。默认情况下,流读取将首先读取完整快照。为了避免分区的无序读取,您可以打开此选项。

scan.snapshot-id

(none)

Long

"from-snapshot"或"from-snapshot-full"扫描模式下使用的可选快照 ID

scan.tag-name

(none)

String

"from-snapshot"扫描模式下使用的可选标签名称

scan.timestamp-millis

(none)

Long

"from-timestamp"扫描模式下使用的可选时间戳。如果没有早于此时间的快照,将选择最早的快照。

scan.watermark

(none)

Long

"from-snapshot"扫描模式下使用的可选水印。如果没有晚于此水印的快照,将抛出异常。

sequence.field

(none)

String

为主键表生成序列号的字段,序列号决定哪些数据是最新的。

sink.watermark-time-zone

"UTC"

String

将长水印值解析为 TIMESTAMP 值的时区。默认值为'UTC',这意味着水印是在 TIMESTAMP 列上定义的,或者未定义。如果水印是在 TIMESTAMP_LTZ 列上定义的,水印的时区是用户配置的时区,该值应该是用户配置的本地时区。选项值可以是全名,如'America/Los_Angeles',也可以是自定义时区 ID,如'GMT-08:00'。

snapshot.expire.clean-empty-directories

true

Boolean

过期快照时是否尝试清理空目录。请注意,尝试清理目录可能会在文件系统中抛出异常,但在大多数情况下不会造成问题。

snapshot.expire.execution-mode

sync

Enum

指定快照过期的执行模式。
可能的值:

  • "sync":同步执行过期。如果文件过多,可能会花费很长时间并阻塞流处理。
  • "async":异步执行过期。如果快照的生成大于删除,将会有文件积压。

snapshot.expire.limit

10

Integer

一次允许过期的最大快照数量。

snapshot.num-retained.max

infinite

Integer

要保留的已完成快照的最大数量。应大于或等于最小数量。

snapshot.num-retained.min

10

Integer

要保留的已完成快照的最小数量。应大于或等于 1。

snapshot.time-retained

1 h

Duration

已完成快照的最大保留时间。

snapshot.watermark-idle-timeout

(none)

Duration

在水印中,如果源在指定的超时持续时间内保持空闲,它会触发快照推进并便于创建标签。

sort-compaction.local-sample.magnification

1000

Integer

排序压缩的本地样本的放大倍数。本地样本的大小是接收器并行度 * 放大倍数。

sort-compaction.range-strategy

QUANTITY

Enum

排序压缩的范围策略,默认值是数量。如果为排序任务分配的数据大小不均匀,可能导致性能瓶颈,可以将配置设置为大小。
可能的值:

  • "SIZE"
  • "QUANTITY"

sort-engine

loser-tree

Enum

为具有主键的表指定排序引擎。
可能的值:

  • "min-heap":使用最小堆进行多路排序。
  • "loser-tree":使用败者树进行多路排序。与堆排序相比,败者树比较次数更少,效率更高。

sort-spill-buffer-size

64 mb

MemorySize

在 spill 磁盘的排序中 spill 记录到磁盘的数据量。

sort-spill-threshold

(none)

Integer

如果排序读取器的最大数量超过此值,将尝试 spill。这可以防止过多的读取器消耗过多内存并导致内存 spill。

source.split.open-file-cost

4 mb

MemorySize

源文件的打开文件成本。用于避免使用源分割读取过多文件,这可能非常慢。

source.split.target-size

128 mb

MemorySize

扫描桶时源分割的目标大小。

spill-compression

"LZ4"

String

溢出的压缩,当前支持 lz4、lzo 和 zstd。

streaming-read-mode

(none)

Enum

流读取的模式,指定读取表文件或日志的数据
可能的值:

  • "log":从日志存储读取。
  • "file":从文件存储读取。

streaming-read-overwrite

false

Boolean

在流模式下是否读取覆盖中的更改。当变更日志生成器为完整压缩或查找时,不能设置为 true,因为这会读取重复的更改。

tag.automatic-creation

none

Enum

是否自动创建 Tag 以及如何生成标签。
可能的值:

  • "none":不自动创建标签。
  • "process-time":基于机器的时间,一旦处理时间超过周期时间加上延迟,创建标签。
  • "watermark":基于输入的水印,一旦水印超过周期时间加上延迟,创建标签。
  • "batch":在批处理场景中,任务完成后生成当前快照对应的标签。

tag.callback.#.param

(none)

String

类 # 的构造函数的参数字符串。回调类应自行解析该参数。

tag.callbacks

(none)

String

成功创建 Tag 后要调用的一组提交回调类。类名用逗号连接(例如:com.test.CallbackA,com.sample.CallbackB)。

tag.creation-delay

0 ms

Duration

周期结束后创建 Tag 之前的延迟时间。这可以允许一些迟到的数据进入标签。

tag.creation-period

daily

Enum

生成标签的频率。
可能的值:

  • "daily":每天生成一个标签。
  • "hourly":每小时生成一个标签。
  • "two-hours":每两小时生成一个标签。

tag.default-time-retained

(none)

Duration

新创建 Tag 的默认最大保留时间。它影响自动创建的 Tag 和手动创建(通过过程)的 Tag 。

tag.num-retained-max

(none)

Integer

要保留的最大 Tag 数量。它仅影响自动创建的 Tag。

tag.period-formatter

with_dashes

Enum

Tag 周期的日期格式。
可能的值:

  • "with_dashes":带破折号的日期和小时,例如,'yyyy-MM-dd HH'
  • "without_dashes":不带破折号的日期和小时,例如,'yyyyMMdd HH'

target-file-size

128 mb

MemorySize

文件的目标大小。

write-buffer-for-append

false

Boolean

此选项仅适用于仅追加表。写入是否使用写入缓冲区以避免内存不足错误。

write-buffer-size

256 mb

MemorySize

在转换为排序的磁盘文件之前在内存中积累的数据量。

write-buffer-spill.max-disk-size

infinite

MemorySize

用于写入缓冲区溢出的最大磁盘大小。仅在启用写入缓冲区溢出时有效

write-buffer-spillable

(none)

Boolean

写入缓冲区是否可溢出。使用对象存储时默认启用。

write-manifest-cache

0 bytes

MemorySize

写入初始化时读取清单文件的缓存大小。

write-max-writers-to-spill

5

Integer

在批量追加插入时,如果写入器数量大于此选项,我们将打开缓冲区缓存和溢出功能以避免内存不足。

write-only

false

Boolean

如果设置为 true,将跳过 compaction 和快照过期。此选项开启后建议设置专门的 compaction 作业完成 compaction 工作。

zorder.var-length-contribution

8

Integer

类型(CHAR、VARCHAR、BINARY、VARBINARY)用于 zorder 排序的字节数。

本节文档主要参考 Apache Paimon 官方文档,更加详细的介绍请参见原始文档。

参考文档

Paimon 是一个活跃且快速发展的社区,强烈建议您关注 Apache Paimon 社区文档,关注最新的变化。