You need to enable JavaScript to run this app.
导航
Paimon 写入性能优化
最近更新时间:2025.02.10 10:49:01首次发布时间:2025.02.10 10:49:01

本文档旨在帮助用户系统性理解 Paimon 的写入性能调优技巧,并提供常见的优化建议,以提升写入吞吐量和稳定性。

1. 写入性能优化

Paimon 的写入性能与 Flink 的检查点机制密切相关。以下是一些常见的优化手段:

  • Checkpoint 间隔:增加 Checkpoint 间隔,减少 Checkpoint 频率,从而降低写入开销。

Image

1.2 缓冲区优化

  • 增大写入缓冲区:通过增加 write-buffer-size 来提升单次写入的数据量。
  • 启用可溢写缓冲区:启用 write-buffer-spillable,允许缓冲区在内存不足时将数据溢写到磁盘,避免 OOM。

1.3 桶数量调整

  • 如果使用固定桶模式(Fixed-Bucket Mode),建议根据数据量和集群资源调整桶的数量,避免桶数量过少导致写入热点。建议可以控制每个桶文件大小在 1GB 左右。

1.4 Changelog 生产者配置

  • Changelog 生产者:选项 changelog-producer 可以设置为 lookupfull-compaction,但这两个选项会对写入性能产生较大影响。
    • 快照/全量同步阶段,建议取消这些选项以提升写入性能。
    • 增量同步阶段,可以重新启用这些选项以支持增量数据处理。

1.5 异步 Compaction

  • 如果发现作业输入在背压情况下呈现锯齿状波动,可能是工作节点负载不均衡。此时可以启用异步压实(Asynchronous Compaction),观察吞吐量是否提升。
    • Compaction 操作本质上是异步的,但如果希望它完全异步且不阻塞写入操作,以期实现最大写入吞吐量,那么可以让异步更加平缓的进行操作。可以在表 WITH 参数中设置以下配置:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH (
    -- 开启异步 Compaction
    'num-sorted-run.stop-trigger' = '2147483647', 
    'sort-spill-threshold' = '10', 
    'changelog-producer.lookup-wait' = 'false', 
    ... 
);
  • 这种完全异步的 Compaction 配置会在写入高峰期生成更多小文件,并在写入低谷期逐渐合并这些文件,以实现最优的读取性能。

2. 并行度配置

Paimon 的写入并行度与桶数量密切相关,建议遵循以下原则:

  • Sink 并行度:建议将 Sink 的并行度设置为小于或等于桶的数量,最好与桶数量相等。
  • 配置方式:通过表属性 sink.parallelism 来调整 Sink 的并行度。例如:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('sink.parallelism' = '16', ...);

3. 本地合并(Local Merging)

如果作业存在 主键数据倾斜 问题(例如 AGG 表统计网站页面浏览量时,某些热门页面的访问量远高于其他页面),可以通过以下方式优化:

  • 启用本地合并:设置 local-merge-buffer-size,在数据按桶分区之前对输入记录进行缓冲和合并。
  • 推荐值:建议从 64 MB 开始调整,逐步优化。

注意:本地合并目前不支持 CDC 数据同步。

4. 文件格式选择

Paimon 支持多种文件格式,不同格式在写入性能和查询性能之间存在权衡。

4.1 行存储格式(AVRO)

  • 优点:写入吞吐量高,压缩性能优异。
  • 缺点:查询性能较差,尤其是当表中有大量列但只查询少数列时,IO 开销较大。此外,压缩效率较低,存储成本较高。
  • 适用场景:适合写入密集型场景,不适用于频繁查询的场景。
  • 配置方式
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('file.format' = 'avro', 'metadata.stats-mode' = 'none', ...);

4.2 分层文件格式

  • 如果不想将所有文件改为 AVRO 格式,可以仅将前几层文件设置为 AVRO 格式。例如:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('file.format.per.level' = '0:avro,1:avro', ...);

5. 文件压缩优化

Paimon 默认使用 zstd 压缩算法(级别为 1),可以通过以下方式优化压缩性能:

  • 调整压缩级别:增加 file.compression.zstd-level 以提高压缩率,但会降低读写速度。例如:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('file.compression.zstd-level' = '3', ...);

6. 稳定性优化

6.1 检查点超时

  • 如果桶数量或资源不足,全量压缩(Full-Compaction)可能导致检查点超时。Flink 默认检查点超时时间为 10 分钟,建议根据业务需求适当增加超时时间。例如:

Image

6.2 写入初始化加速

  • 在写入初始化时,桶的写入器需要读取所有历史文件。如果初始化时间过长,可以启用 write-manifest-cache 来缓存读取的清单数据,从而加速初始化。

7. 内存优化

Paimon 写入过程中主要占用内存的地方包括:

  1. 写入缓冲区:通过 write-buffer-size 调整。
  2. 压缩内存:通过 num-sorted-run.compaction-trigger 调整合并的排序运行数量。
  3. 大行数据读取:通过 read.batch-size 减少单次读取的数据量。
  4. ORC 写入内存:通过 orc.write.batch-size 调整 ORC 写入的批次大小。

7.1 字典编码优化

  • Parquet 格式:禁用字典编码以减少内存消耗:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('parquet.enable.dictionary' = 'false', ...);
  • ORC 格式:禁用字典编码或指定特定列禁用:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('orc.dictionary.key.threshold' = '0', 'orc.column.encoding.direct' = 'field1,field2');

7.2 托管内存优化

  • 如果 Flink 作业不依赖状态,可以禁用托管内存以减少内存开销:

Image

  • 或者,使用 Flink 托管内存来管理写入缓冲区:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('sink.use-managed-memory-allocator' = 'true', ...);

8. 提交内存优化

如果写入数据量特别大,提交节点(Committer)可能会消耗大量内存。可以通过以下方式优化:

  • 细粒度资源管理:启用 Flink 的细粒度资源管理功能,并单独增加提交器的堆内存。例如:

Image

CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('sink.committer-memory' = '300m');

9. 推荐配置优化项

通过合理调整 Flink 配置、并行度、文件格式、压缩算法和内存参数,可以显著提升 Paimon 的写入性能和稳定性。建议根据实际业务场景和数据特点,逐步尝试上述优化手段,以达到最佳性能。

最后给出以下在大规模写入场景下,常见的写入性能优化建议配置项:

CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` (
    word varchar, -- 示例字段
    cnt bigint,
    PRIMARY KEY (word) NOT ENFORCED
) WITH (
    'bucket' = 'N',  -- 建议使用固定分层桶,可适当调大分桶数量,单个 bucket 推荐存储 1GB 左右数据
    'sink.parallelism' = 'N', -- 在追数阶段,建议 sink 并行度保持和 bucket 数量一致
    -- 开启异步 Compaction
    'num-sorted-run.stop-trigger' = '2147483647', 
    'sort-spill-threshold' = '10', 
    'changelog-producer.lookup-wait' = 'false',  -- changelog-producer 为 none 的时候这个参数可以忽略
    -- 开启写入缓存 spill 到磁盘,避免 OOM 失败
    'write-buffer-spillable' = 'true',
    'changelog-producer' = 'none' -- 如果下游不依赖 changelog,或者在全量同步阶段,则可以设置为 none
);