Apache Paimon 是一个基于流和批处理的实时数据湖解决方案,结合了高效的存储和灵活的计算,专为处理大规模实时数据和流式数据而设计。Paimon 通过集成 Log-Structured Merge Tree (LSM Tree) 结构,实现高效的实时更新和查询能力,支持流批一体化的数据处理模式。其优势在于低延迟、强一致性和易扩展性。
Paimon 具有以下功能特色,更多内容可以参考社区官方文档:
Paimon 提供表抽象,支持批处理和流处理模式下的数据读写操作。
多引擎生态系统支持:除了与 Apache Flink 的深度集成,Paimon 还支持其他计算引擎,如 Apache Hive、Apache Spark、 Trino、 StarRocks 等,提供灵活的数据读写能力。
主键表(Primary Key Table):
主键表为 Paimon 默认表类型,可插入、更新、删除记录。主键由列组成,记录主键值须唯一。Paimon 在存储桶内排序主键确保数据有序,过滤查询主键时性能高。主键表用 LSM 树结构存储数据,支持高效读写。它还支持分区和存储桶机制提升数据管理与查询效率。
非主键表(Non Primary Key Table):
若表未定义主键,默认为追加表。流式处理时,追加表只支持插入完整记录,适用于日志数据同步等无需流式更新的用例。默认无存储桶概念,类似 Hive 表,数据文件按分区组织排序以加速查询。流式写入作业中,追加表有自动小文件合并机制,由 Compact Coordinator 和 Compact Worker 组件扫描合并小文件,提高存储效率与查询性能。
追加队列表为按存储桶划分的队列。同一存储桶内记录按写入顺序排列,流式读取按此顺序传递给下游,适用于需保证数据顺序场景如日志处理、事件溯源。默认所有数据入一个存储桶成一个队列,用户可通过定义参数增加并行度、分散负载。Append Queue 支持自动文件压缩以控制文件数量、提升存储和查询效率,流式读取时 Paimon 会按分区和存储桶顺序确保数据按预期顺序传递。
Paimon 提供多种合并引擎,支持不同的合并机制,适用于不同的场景。以下是主要的合并引擎机制:
对这几种合并引擎的详细介绍如下:
合并引擎类型 | 描述 | 适用场景 |
---|---|---|
去重(Deduplicate) | 默认的合并引擎。Paimon 仅保留具有相同主键的最新记录,丢弃其他记录。特别地,如果最新记录是 | 适用于需要确保主键唯一性,并且只关心最新数据的场景。 |
部分更新(Partial Update) | 允许通过多次更新逐步完善记录的各个列,即根据相同主键,逐个更新字段的最新值,但不会覆盖非空值。默认情况下,部分更新引擎不接受删除记录,可以通过配置 | 适用于需要逐步更新记录的部分字段,而不影响其他字段的场景。适合主键多流 Join 的场景。 |
预聚合(Aggregation) | 用户只关注聚合结果。根据指定的聚合函数,对具有相同主键的记录进行字段级别的聚合。每个非主键字段可以指定聚合函数,未指定的字段默认使用 | 适用于需要对数据进行聚合计算的场景,例如求和、计数等。 |
首行(First Row) | 保留具有相同主键的第一条记录,与去重引擎不同,首行引擎保留最早的记录。这种模式下产出变更日志是 INSERT 流。 | 适用于需要保留首次出现的记录,而忽略后续更新的场景。 |
changelog-producer
参数用于控制表在写入数据时生成变更日志(changelog)的方式。不同的模式适用于不同的应用场景,以下是主要的 changelog-producer
模式及其作用和适用场景:
模式名称 | 作用 | 适用场景 |
---|---|---|
无(None) | 默认模式,不生成额外的变更日志。Paimon 源只能看到跨快照合并后的更改,无法形成完整的变更日志。 | 适用于不需要完整变更日志的消费者,如数据库系统。需要注意的是,Flink 内置的 “Normalize” 操作符会在状态中持久化每个键的值,但这种操作代价高昂,应尽量避免。 |
输入(Input) | 将输入视为完整的变更日志源,所有输入记录将被保存到独立的变更日志文件中,并通过 Paimon 源提供给消费者。 | 适用于输入本身就是完整的变更日志的情况,例如来自数据库的 CDC(变更数据捕获)数据,或由 Flink 有状态计算生成的数据。 |
查找(Lookup) | 在提交数据写入之前,通过查找生成变更日志。Paimon 会在内存和本地磁盘上缓存数据,以生成完整的变更日志。 | 适用于输入无法生成完整变更日志,但希望避免昂贵的 “Normalize” 操作的情况。需要注意的是,Lookup 操作会消耗一定的资源,比较适用于实时性要求较高场景(分钟级) |
完全压缩(Full Compaction) | 通过比较完全压缩之间的结果,生成差异作为变更日志。变更日志的延迟受到完全压缩频率的影响。 | 适用于希望解耦数据写入和变更日志生成的高延迟场景,例如每隔一段时间(如小时级)生成一次变更日志。此模式可以减少资源消耗,但会增加变更日志的延迟。 |
Paimon 版本 | Flink 引擎版本 | 功能说明 |
---|---|---|
0.6 | Flink-1.16-volcano | 没有内置 LAS Catalog 支持,需要手动上传 Connector。 |
0.8.2 | Flink-1.17-volcano |
|
火山引擎 Paimon 支持多种元存储类型,以下是文件系统 Catalog 和 LAS Catalog 的创建方法:
在此模式下,元数据和表文件都存储在文件系统中。
CREATE CATALOG my_catalog WITH ( 'type' = 'paimon', 'warehouse' = 'tos://<bucket-name>/path/to/warehouse' );
上述 SQL 语句创建了一个名为 my_catalog
的 Paimon Catalog,数据仓库路径为 tos://<bucket-name>/path/to/warehouse
。
此模式下,元数据存储在 LAS Catalog 的元数据存储服务中,表文件存储在指定的文件系统路径下。
CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', -- LAS 元数据服务的 Thrift 接口地址,注意修改其中具体 Region 信息 'uri' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869', -- Hive Conf 目录,由 Flink 作业开发界面依赖文件模块上传 'hive-conf-dir' = '/opt/tiger/workdir', -- 存储的桶目录地址 'warehouse' = 'tos://<bucket-name>/path/to/warehouse' );
上述 SQL 语句创建了一个名为 my_hive
的 Paimon Catalog,元数据存储在 LAS Catalog 元数据中,数据仓库路径为 tos://<bucket-name>/path/to/warehouse
。
另外在创建作业的时候需要在作业开发界面上传相应的 hive.xml 文件:
其中 hive.xml 文件内容如下:
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.server2.max.start.attempts</name> <value>5</value> </property> <property> <name>hive.client.las.region.name</name> <value>cn-shanghai</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://lakeformation.las.cn-shanghai.ivolces.com:48869</value> </property> <property> <name>hive.hms.client.is.public.cloud</name> <value>true</value> </property> <property> <name>hive.client.las.ak</name> <value>__LAS_ACCESS_KEY__</value> </property> <property> <name>hive.client.las.sk</name> <value>__LAS_ACCESS_KEY_SECRET__</value> </property> </configuration>
其中注意要填写具体的 LAS Catalog 和账号 AK/SK 信息。然后将 hive.xml 文件通过作业开发-依赖文件。
另外,如果是使用 Flink-1.16-volcano 的话,引擎没有内置 LAS Catalog 的连接器,需要手动上传以下 JAR 包到依赖文件中:
在指定的 Catalog 中,可以使用 SQL 语句创建数据表。以下是创建一个包含主键和分区的表的示例:
-- 创建 Catalog CREATE CATALOG `paimon_test` WITH ( 'type' = 'paimon', 'warehouse' = 'tos://<bucket-name>/path/to/warehouse' ); -- 创建 Database CREATE DATABASE IF NOT EXISTS `paimon_test`.`test_db`; -- 创建数据表 CREATE TABLE IF NOT EXISTS `paimon_test`.`test_db`.`doc_result` ( word varchar, cnt bigint, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 2GB 左右数据 'changelog-producer' = 'input' -- 产生 changelog,用于下游流读 );
通过上述 SQL 语法,用户可以在 Paimon 中创建不同类型的 Catalog 和数据表,以满足不同的业务需求。
Paimon 支持通过 Flink SQL 对表进行数据写入操作,主要包括以下几种方式:
使用 INSERT INTO
语句,可以将数据插入到指定的表中。此操作适用于批处理和流式作业。
INSERT INTO my_table SELECT ...
在流式模式下,默认情况下,Paimon 会在 Flink Sink 中执行数据压缩、快照过期,甚至分区过期(如果已配置)。
对于未分区的表,可以使用 INSERT OVERWRITE
语句来覆盖整个表的数据。
INSERT OVERWRITE my_table SELECT ...
对于分区表,可以使用 INSERT OVERWRITE
语句来覆盖特定分区的数据。
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
Flink 默认采用动态分区覆写模式,即仅删除被覆写数据中出现的分区。可以通过配置 dynamic-partition-overwrite
参数来切换到静态覆写模式。
-- 动态覆写 INSERT OVERWRITE my_table SELECT ... -- 静态覆写(覆写整个表) INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...
可以使用 INSERT OVERWRITE
语句插入空值来清空表数据。
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM my_table WHERE false;
目前,Paimon 支持两种方式来删除分区数据:
INSERT OVERWRITE
语句向特定分区插入空值,以清空该分区的数据。INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM my_table WHERE false;
drop_partition
作业来实现。Paimon 支持通过 Flink SQL 对表进行查询操作,主要包括以下方式:
1. 批查询(Batch Query):
在批处理模式下,Paimon 返回表的特定快照数据,默认返回最新快照。
-- 在作业模式中设置为批作业 -- 查询最新快照,其中 print_sink 是 Connector 为 print 的 Sink 表 INSERT INTO print_sink SELECT * FROM my_table;
注意:因为在流式计算 Flink 产品中,暂不支持直接使用 SELECT 语句进行查询调试,所以如果需要在作业开发界面调试 Query 语句,都必须要增加
INSERT INTO print_sink
和print_sink 建表语句
进行数据打印。
另外,为了下文行文方便,统一省略INSERT INTO
相关调试语句。请在实际应用中结合实际情况进行语句调整。
2. 时间旅行查询(Time Travel Query):
Paimon 支持基于时间或快照 ID 的时间旅行查询,允许用户读取表在特定时间点或快照的状态。
-- 通过快照 ID 查询 SELECT * FROM my_table /*+ OPTIONS('scan.snapshot-id' = '1') */; -- 通过时间戳查询(Unix 毫秒时间) SELECT * FROM my_table /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */; -- 通过标签查询 SELECT * FROM my_table /*+ OPTIONS('scan.tag-name' = 'my-tag') */; -- 通过水位线时间戳查询 SELECT * FROM my_table /*+ OPTIONS('scan.watermark' = '1678883047356') */;
3. 增量批查询(Batch Incremental Query):
Paimon 支持增量批查询,允许用户读取表在特定快照范围内的变更数据。
-- 查询快照 ID 范围内的增量数据 SELECT * FROM my_table /*+ OPTIONS('incremental-between' = '12,20') */; -- 查询时间戳范围内的增量数据 SELECT * FROM my_table /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;
对于需要查看删除记录的情况,可以查询 audit_log
系统表:
SELECT * FROM my_table$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
4. 流式查询(Streaming Query):
在流处理模式下,Paimon 支持实时读取表的最新变更。默认情况下,流式查询在启动时会读取表的最新快照,并持续读取后续的变更。
-- Flink SQL 作业设置为流式模式 -- 流式查询表数据 SELECT * FROM my_table;
此外,Paimon 还支持在流式查询中指定消费起始位置,例如从特定快照或时间点开始消费:
-- 从指定快照 ID 开始消费 SELECT * FROM my_table /*+ OPTIONS('scan.startup-mode' = 'from-snapshot', 'scan.snapshot-id' = '1') */; -- 从指定时间戳开始消费 SELECT * FROM my_table /*+ OPTIONS('scan.startup-mode' = 'from-timestamp', 'scan.timestamp-millis' = '1678883047356') */;
通过上述功能,Paimon 为 Flink SQL 查询提供了灵活的支持,满足不同场景下的数据查询需求。
Paimon 支持在 Flink 中使用 Lookup Join 功能,以丰富流式查询的数据。可以参考如下的流程使用 Paimon 的维表功能:
1. 准备工作:
首先,需要创建一个 Paimon 表,并实时更新该表的数据。
-- 创建 Paimon Catalog CREATE CATALOG my_catalog WITH ( 'type'='paimon', 'warehouse'='tos://<bucket-name>/path/to/warehouse' ); USE CATALOG my_catalog; -- 在 Paimon Catalog 中创建表 CREATE TABLE customers ( id INT PRIMARY KEY NOT ENFORCED, name STRING, country STRING, zip STRING ); -- 启动流式作业以更新 customers 表 INSERT INTO customers ... -- 创建临时左表,例如来自 Kafka 的数据 CREATE TEMPORARY TABLE orders ( order_id INT, total INT, customer_id INT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = '...', 'properties.bootstrap.servers' = '...', 'format' = 'csv' ... );
2. 普通 Lookup Join:
在准备好上述表后,可以在查询中使用 Lookup Join,将实时订单数据与客户信息关联。
-- 使用客户信息丰富每个订单 SELECT o.order_id, o.total, c.country, c.zip FROM orders AS o JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
在上述查询中,orders
表是流式数据源,customers
表是维度表,通过 FOR SYSTEM_TIME AS OF
子句实现基于处理时间的关联。
3. 加重试机制的 Lookup Join:
如果在进行 Lookup Join 时,orders
表中的记录由于 customers
表中尚未准备好的数据而导致关联失败,可以考虑使用 Flink 的延迟重试策略(仅适用于 Flink-1.16-volcano 及以上版本)。
-- 使用客户信息丰富每个订单,并在关联失败时进行重试 SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */ o.order_id, o.total, c.country, c.zip FROM orders AS o JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
在上述查询中,使用了 Flink 的 Lookup Hint 来指定重试策略,例如固定延迟 1 秒,最多重试 600 次,以应对维度表数据暂时不可用的情况。
存储过程(Procedure)语法是为了实现 Compaction、创建分支、Tag、回滚等高级湖管理功能的语法。在 Flink-1.17-volcano 及以上版本支持。
注意:当前语法仅支持 Flink-1.17-volcano 及以上版本。
命令 | 用法 | 解释 | 示例 |
---|---|---|---|
compact | CALL [catalog.]sys.compact('table') | Compact 数据表。参数如下:
| -- 使用分区选择功能 |
compact_database | CALL [catalog.]sys.compact_database() | Compact 数据库。参数如下:
| CALL sys.compact_database('db1 |
create_tag | -- 基于指定快照 -- 基于最新快照 | 基于给定的快照创建标签。参数如下:
| CALL sys.create_tag('default.T', 'my_tag', 10, '1 d') |
delete_tag | CALL [catalog.]sys.delete_tag('identifier', 'tagName') | 删除标签。参数如下:
| CALL sys.delete_tag('default.T', 'my_tag') |
merge_into | CALL [catalog.]sys.merge_into('identifier','targetAlias', | 执行 "MERGE INTO" 语法。 参考 Paimon 社区 merge_into action 文档查看具体用法 | -- 对于匹配的订单行,提高价格,并且如果没有匹配项,则插入来自源表的订单。 |
remove_orphan_files | CALL [catalog.]sys.remove_orphan_files('identifier') | 删除孤立的数据文件和元数据文件。参数如下:
| CALL remove_orphan_files('default.T', '2023-10-31 12:00:00') |
reset_consumer | -- 重置消费者的到下一个快照 | 重置或删除消费者。参数如下:
| CALL sys.reset_consumer('default.T', 'myid', 10) |
rollback_to | -- 回滚到指定 snapshot | 回滚到目标表的特定版本。参数: identifier(标识符):目标表的标识符,不能为空。 | CALL sys.rollback_to( |
expire_snapshots | -- 社区 Flink 1.18 版本风格 | 使快照过期。参数: table(表):目标表的标识符,不能为空。 | CALL sys.expire_snapshots('default.T', 2) |
参数 | 默认值 | 类型 | 解释 |
---|---|---|---|
auto-create | false | Boolean | 在读写表时是否创建底层文件/文件夹。 |
bucket | -1 | Integer | 文件存储的桶数量。 |
bucket-key | (none) | String | 指定 Paimon 数据分布策略。数据根据桶键( |
cache-page-size | 64 kb | MemorySize | 用于缓存的内存页 Page 大小 |
changelog-producer | none | Enum | 是否双写到变更日志文件中。该变更日志文件记录了数据变更的详细信息,可以在流式读取时直接使用。此功能适用于具有主键的表。
|
changelog-producer.row-deduplicate | false | Boolean | 是否为相同记录生成 |
changelog.num-retained.max | (none) | Integer | 保留的已完成变更日志的最大数量。应大于或等于 |
changelog.num-retained.min | (none) | Integer | 保留的已完成变更日志的最小数量。应大于或等于 1。 |
changelog.time-retained | (none) | Duration | 已完成变更日志的最大保留时间 |
commit.callback.#.param | (none) | String | 用于类构造函数的参数字符串,回调类应自行解析该参数 |
commit.callbacks | (none) | String | 提交成功后调用的一组提交回调类。类名通过逗号连接(例如: |
commit.force-compact | false | Boolean | 是否在提交前强制进行 compaction。 |
commit.force-create-snapshot | false | Boolean | 是否在提交前强制创建快照。 |
compaction.max-size-amplification-percent | 200 | Integer | 对于变更日志模式表,合并树中存储单个字节数据所需的额外存储量(以百分比表示)。 |
compaction.max.file-num | 50 | Integer | 对于文件集 [f_0,...,f_N],即使总和(size(f_i))小于目标文件大小,触发仅追加表的 compaction 的最大文件数量。此值可避免挂起过多小文件,从而降低性能。 |
compaction.min.file-num | 5 | Integer | 对于文件集 [f_0,...,f_N],对于仅追加表,满足总和(size(f_i))>= 目标文件大小以触发 compaction 的最小文件数量。此值避免几乎满的文件被 compaction,这是不划算的。 |
compaction.optimization-interval | (none) | Duration | 执行优化 compaction 的频率,此配置用于确保读优化系统表的查询及时性。 |
compaction.size-ratio | 1 | Integer | 对于变更日志模式表,在比较排序运行大小时的百分比灵活性。如果候选排序运行(s)的大小比下一个排序运行的大小小 1%,则将下一个排序运行包含在此候选集中。 |
consumer-id | (none) | String | 用于记录存储中消费偏移量的消费者 ID。 |
consumer.expiration-time | (none) | Duration | 消费者文件的过期间隔。如果自上次修改后的生存期超过此值,消费者文件将过期。 |
consumer.ignore-progress | false | Boolean | 对于新启动的作业是否忽略消费者进度。 |
consumer.mode | exactly-once | Enum | 指定表的消费者一致性模式。
|
continuous.discovery-interval | 10 s | Duration | 持续读取文件的发现间隔时间,默认 10 秒会检测是否有文件变化。 |
cross-partition-upsert.bootstrap-parallelism | 10 | Integer | 跨分区更新插入的单个任务中引导的并行度。 |
cross-partition-upsert.index-ttl | (none) | Duration | 跨分区更新插入(主键不包含所有分区字段)在 rocksdb 索引中的 TTL,这可以避免维护过多索引并导致性能越来越差,但请注意,这也可能导致数据重复。 |
delete.force-produce-changelog | false | Boolean | 在删除 SQL 中强制生成变更日志,或者您可以使用“streaming-read-overwrite”从覆盖提交中读取变更日志。 |
deletion-vectors.enabled | false | Boolean | 是否启用删除向量模式。在此模式下,写入数据时会生成包含删除向量的索引文件,标记要删除的数据。在读取操作期间,通过应用这些索引文件,可以避免合并。 |
dynamic-bucket.assigner-parallelism | (none) | Integer | 动态桶模式下分配器操作的并行度,它与初始化的桶数量有关,过小会导致分配器处理速度不足。 |
dynamic-bucket.initial-buckets | (none) | Integer | 动态桶模式下分配器操作中一个分区的初始桶数。 |
dynamic-bucket.target-row-num | 2000000 | Long | 如果桶为 -1,对于主键表,为动态桶模式,此选项控制一个桶的目标行数。 |
dynamic-partition-overwrite | true | Boolean | 在覆盖具有动态分区列的分区表时是否仅覆盖动态分区。仅在表具有分区键时有效。 |
file-index.in-manifest-threshold | 500 bytes | MemorySize | 在清单中存储文件索引字节的阈值。 |
file-index.read.enabled | true | Boolean | 是否启用读取文件索引。 |
file-reader-async-threshold | 10 mb | MemorySize | 读取文件异步的阈值。 |
file.compression | (none) | String | 默认文件压缩格式
|
file.compression.per.level | Map | 为不同级别定义不同的压缩策略,您可以这样添加配置:'file.compression.per.level' = '0:lz4,1:zstd'。 | |
file.format | orc | Enum | 指定数据文件的消息格式,当前支持 orc、parquet 和 avro 格式。
|
file.format.per.level | Map | 为不同级别定义不同的文件格式,您可以这样添加配置:'file.format.per.level' = '0:avro,3:parquet',如果未为级别提供文件格式,则将使用 | |
full-compaction.delta-commits | (none) | Integer | 在增量提交多少次之后会定时的进行 Full Compaction。默认不会有这个行为 |
ignore-delete | false | Boolean | 是否需要忽略删除记录行为,默认是不忽略。 |
incremental-between | (none) | String | 在起始快照(排他)和结束快照之间读取增量更改,例如,'5,10' 表示快照 5 和快照 10 之间的更改。 |
incremental-between-scan-mode | auto | Enum | 在起始快照(排他)和结束快照之间读取增量更改时的扫描类型。
|
incremental-between-timestamp | (none) | String | 在起始时间戳(排他)和结束时间戳之间读取增量更改,例如,'t1,t2' 表示时间戳 t1 和时间戳 t2 之间的更改。 |
local-merge-buffer-size | (none) | MemorySize | 本地合并将在输入记录按桶(bucket)Shuffle 并写入接收器(sink)之前对其进行缓冲和合并。当缓冲区满时将被刷新。主要用于解决主键上的数据倾斜问题。我们建议在试用此功能时从 64 兆字节(64mb)开始。 |
local-sort.max-num-file-handles | 128 | Integer | 外部合并排序的最大扇入。它限制了文件句柄的数量。如果太小,可能导致中间合并。但如果太大,会导致同时打开太多文件,消耗内存并导致随机读取。 |
lookup.cache-file-retention | 1 h | Duration | 查找的缓存文件保留时间。文件过期后,如果需要访问,将从 DFS 重新读取以在本地磁盘上构建索引。 |
lookup.cache-max-disk-size | infinite | MemorySize | Lookup 缓存的最大磁盘大小,您可以使用此选项限制本地磁盘的使用。 |
lookup.cache-max-memory-size | 256 mb | MemorySize | Lookup 缓存的最大内存大小。 |
lookup.cache-spill-compression | "lz4" | String | Lookup 缓存的 spill 文件压缩,当前支持 |
lookup.cache.bloom.filter.enabled | true | Boolean | 是否为 Lookup 缓存启用布隆过滤器。 |
lookup.cache.bloom.filter.fpp | 0.05 | Double | 为查找缓存布隆过滤器定义默认的误报概率。 |
lookup.hash-load-factor | 0.75 | Float | 查找的索引加载因子。 |
manifest.format | avro | Enum | 指定 manifest 文件的消息格式。
|
manifest.full-compaction-threshold-size | 16 mb | MemorySize | 触发清单 full compaction 的大小阈值。 |
manifest.merge-min-count | 30 | Integer | 为避免频繁的 manifest 文件合并,此参数指定要合并的 ManifestFileMeta 的最小数量。 |
manifest.target-file-size | 8 mb | MemorySize | 建议的 manifest 文件大小。 |
merge-engine | deduplicate | Enum | 为具有主键的表指定合并引擎。
|
metadata.stats-mode | "truncate(16)" | String | 元数据统计信息收集的模式。可用的模式有
|
metastore.partitioned-table | false | Boolean | 是否在元存储中将此表创建为分区表。例如,如果您想在 Hive 中列出 Paimon 表的所有分区,则需要在 Hive 元存储中将此表创建为分区表。此配置选项不影响默认的文件系统元存储。 |
metastore.tag-to-partition | (none) | String | 是否在元存储中为此表创建用于映射非分区表标签的分区表。这允许 Hive 引擎以分区表视图查看此表,并使用分区字段读取特定分区(特定标签)。 |
metastore.tag-to-partition.preview | none | Enum | 是否在元存储中预览生成的快照的标签。这允许 Hive 引擎在创建之前查询特定标签。
|
num-levels | (none) | Integer | 总级别数,例如,有 3 个级别,包括 0、1、2 级别。 |
num-sorted-run.compaction-trigger | 5 | Integer | 触发压缩操作的排序运行数量。包括第 0 层文件(每个文件为一个排序运行 )和高层级文件(每个层级为一个排序运行)。 |
num-sorted-run.stop-trigger | (none) | Integer | 触发停止写入的排序运行数量,默认值为 'num-sorted-run.compaction-trigger' + 3。 |
page-size | 64 kb | MemorySize | 内存页大小。 |
parquet.enable.dictionary | (none) | Integer | 为 parquet 中的所有字段关闭字典编码。 |
partition | (none) | String | 通过表选项定义分区,不能同时在 DDL 和表选项上定义分区。 |
partition.default-name | "DEFAULT_PARTITION" | String | 在动态分区列值为 null/空字符串的情况下的默认分区名称。 |
partition.expiration-check-interval | 1 h | Duration | 分区过期的检查间隔。 |
partition.expiration-time | (none) | Duration | 分区的过期间隔。如果分区的生存期超过此值,分区将过期。分区时间从分区值中提取。 |
partition.timestamp-formatter | (none) | String | 从字符串格式化时间戳的格式化程序。它可以与 'partition.timestamp-pattern' 一起使用,使用指定的值创建格式化程序。
|
partition.timestamp-pattern | (none) | String | 您可以指定一个模式从分区中获取时间戳。格式化程序模式由 'partition.timestamp-formatter' 定义。
|
primary-key | (none) | String | 通过表选项定义主键,不能同时在 DDL 和表选项上定义主键。 |
read.batch-size | 1024 | Integer | orc 和 parquet 的读取批处理大小。 |
record-level.expire-time | (none) | Duration | 主键表的记录级别过期时间,在压缩时过期,无法保证及时过期记录。您还必须指定 'record-level.time-field'。 |
record-level.time-field | (none) | String | 记录级别过期的时间字段,应为秒 INT 类型。 |
rowkind.field | (none) | String | 为主键表生成行类型的字段,行类型决定哪些数据是 '+I'、'-U'、'+U' 或 '-D'。 |
scan.bounded.watermark | (none) | Long | 有界流模式的结束条件“水印”。当遇到较大的水印快照时,流读取将结束。 |
scan.file-creation-time-millis | (none) | Long | 配置此时间后,仅读取此时间之后创建的数据文件。它独立于快照,但它是不精确的过滤(取决于是否发生压缩)。 |
scan.manifest.parallelism | (none) | Integer | 扫描清单文件的并行度,默认值是 CPU 处理器的大小。注意:扩展 |
scan.max-splits-per-task | 10 | Integer | 扫描时一个任务应缓存的最大分割大小。如果枚举器中缓存的分割大小大于任务大小乘以此值,扫描器将暂停扫描。 |
scan.mode | default | Enum | 指定源的扫描行为。
|
scan.plan-sort-partition | false | Boolean | 是否按分区字段对计划文件进行排序,这允许您按照分区顺序读取,即使您的分区写入是无序的。 |
scan.snapshot-id | (none) | Long | "from-snapshot"或"from-snapshot-full"扫描模式下使用的可选快照 ID |
scan.tag-name | (none) | String | "from-snapshot"扫描模式下使用的可选标签名称 |
scan.timestamp-millis | (none) | Long | "from-timestamp"扫描模式下使用的可选时间戳。如果没有早于此时间的快照,将选择最早的快照。 |
scan.watermark | (none) | Long | "from-snapshot"扫描模式下使用的可选水印。如果没有晚于此水印的快照,将抛出异常。 |
sequence.field | (none) | String | 为主键表生成序列号的字段,序列号决定哪些数据是最新的。 |
sink.watermark-time-zone | "UTC" | String | 将长水印值解析为 TIMESTAMP 值的时区。默认值为'UTC',这意味着水印是在 TIMESTAMP 列上定义的,或者未定义。如果水印是在 TIMESTAMP_LTZ 列上定义的,水印的时区是用户配置的时区,该值应该是用户配置的本地时区。选项值可以是全名,如'America/Los_Angeles',也可以是自定义时区 ID,如'GMT-08:00'。 |
snapshot.expire.clean-empty-directories | true | Boolean | 过期快照时是否尝试清理空目录。请注意,尝试清理目录可能会在文件系统中抛出异常,但在大多数情况下不会造成问题。 |
snapshot.expire.execution-mode | sync | Enum | 指定快照过期的执行模式。
|
snapshot.expire.limit | 10 | Integer | 一次允许过期的最大快照数量。 |
snapshot.num-retained.max | infinite | Integer | 要保留的已完成快照的最大数量。应大于或等于最小数量。 |
snapshot.num-retained.min | 10 | Integer | 要保留的已完成快照的最小数量。应大于或等于 1。 |
snapshot.time-retained | 1 h | Duration | 已完成快照的最大保留时间。 |
snapshot.watermark-idle-timeout | (none) | Duration | 在水印中,如果源在指定的超时持续时间内保持空闲,它会触发快照推进并便于创建标签。 |
sort-compaction.local-sample.magnification | 1000 | Integer | 排序压缩的本地样本的放大倍数。本地样本的大小是接收器并行度 * 放大倍数。 |
sort-compaction.range-strategy | QUANTITY | Enum | 排序压缩的范围策略,默认值是数量。如果为排序任务分配的数据大小不均匀,可能导致性能瓶颈,可以将配置设置为大小。
|
sort-engine | loser-tree | Enum | 为具有主键的表指定排序引擎。
|
sort-spill-buffer-size | 64 mb | MemorySize | 在 spill 磁盘的排序中 spill 记录到磁盘的数据量。 |
sort-spill-threshold | (none) | Integer | 如果排序读取器的最大数量超过此值,将尝试 spill。这可以防止过多的读取器消耗过多内存并导致内存 spill。 |
source.split.open-file-cost | 4 mb | MemorySize | 源文件的打开文件成本。用于避免使用源分割读取过多文件,这可能非常慢。 |
source.split.target-size | 128 mb | MemorySize | 扫描桶时源分割的目标大小。 |
spill-compression | "LZ4" | String | 溢出的压缩,当前支持 lz4、lzo 和 zstd。 |
streaming-read-mode | (none) | Enum | 流读取的模式,指定读取表文件或日志的数据
|
streaming-read-overwrite | false | Boolean | 在流模式下是否读取覆盖中的更改。当变更日志生成器为完整压缩或查找时,不能设置为 true,因为这会读取重复的更改。 |
tag.automatic-creation | none | Enum | 是否自动创建 Tag 以及如何生成标签。
|
tag.callback.#.param | (none) | String | 类 # 的构造函数的参数字符串。回调类应自行解析该参数。 |
tag.callbacks | (none) | String | 成功创建 Tag 后要调用的一组提交回调类。类名用逗号连接(例如:com.test.CallbackA,com.sample.CallbackB)。 |
tag.creation-delay | 0 ms | Duration | 周期结束后创建 Tag 之前的延迟时间。这可以允许一些迟到的数据进入标签。 |
tag.creation-period | daily | Enum | 生成标签的频率。
|
tag.default-time-retained | (none) | Duration | 新创建 Tag 的默认最大保留时间。它影响自动创建的 Tag 和手动创建(通过过程)的 Tag 。 |
tag.num-retained-max | (none) | Integer | 要保留的最大 Tag 数量。它仅影响自动创建的 Tag。 |
tag.period-formatter | with_dashes | Enum | Tag 周期的日期格式。
|
target-file-size | 128 mb | MemorySize | 文件的目标大小。 |
write-buffer-for-append | false | Boolean | 此选项仅适用于仅追加表。写入是否使用写入缓冲区以避免内存不足错误。 |
write-buffer-size | 256 mb | MemorySize | 在转换为排序的磁盘文件之前在内存中积累的数据量。 |
write-buffer-spill.max-disk-size | infinite | MemorySize | 用于写入缓冲区溢出的最大磁盘大小。仅在启用写入缓冲区溢出时有效 |
write-buffer-spillable | (none) | Boolean | 写入缓冲区是否可溢出。使用对象存储时默认启用。 |
write-manifest-cache | 0 bytes | MemorySize | 写入初始化时读取清单文件的缓存大小。 |
write-max-writers-to-spill | 5 | Integer | 在批量追加插入时,如果写入器数量大于此选项,我们将打开缓冲区缓存和溢出功能以避免内存不足。 |
write-only | false | Boolean | 如果设置为 true,将跳过 compaction 和快照过期。此选项开启后建议设置专门的 compaction 作业完成 compaction 工作。 |
zorder.var-length-contribution | 8 | Integer | 类型(CHAR、VARCHAR、BINARY、VARBINARY)用于 zorder 排序的字节数。 |
本节文档主要参考 Apache Paimon 官方文档,更加详细的介绍请参见原始文档。
Paimon 是一个活跃且快速发展的社区,强烈建议您关注 Apache Paimon 社区文档,关注最新的变化。