You need to enable JavaScript to run this app.
导航
Tag 和时间旅行功能
最近更新时间:2025.03.03 09:41:51首次发布时间:2025.03.03 09:41:51
我的收藏
有用
有用
无用
无用

1. Tag 功能概述

Paimon 的 Tag 功能允许用户基于快照创建标签,以保留历史数据。由于 Paimon 表会根据配置自动过期旧的快照,导致历史数据无法查询。通过 Tag 功能,用户可以为特定的快照创建标签,从而保留该快照对应的数据文件和元数据,以便后续查询历史数据。Tag 功能特别适用于需要按天或按小时保留历史数据的场景。

2. Tag 基础操作

在使用之前,需要注意 Flink 版本要求:

  • Flink 版本需 >= 1.17,支持如存储过程等语法

2.1 创建 Tag

用户可以为指定的快照创建 Tag。Tag 会保留该快照的所有数据文件和元数据,确保历史数据可查询。
语法示例:

CALL sys.create_tag('default.T', 'my_tag', 10, '1 d')

参数说明:

  • identifier(标识符):目标表的标识符,不能为空。形如dbName.tableName格式。
  • tagName(标签名):新建 tag 的名称。
  • snapshotId(长整型):新建标签所基于的快照的 ID(标识号)。null 代表从最新快照创建。
  • time_retained(保留时间):为新创建标签保留的最长时间。

2.2 删除 Tag

用户可以删除不再需要的 Tag。删除 Tag 不会影响其他快照或数据文件。
语法示例:

CALL sys.delete_tag('default.T', 'my_tag')

参数说明:

  • identifier(标识符):目标表的标识符,不能为空。形如dbName.tableName格式。
  • tagName(标签名):新建 tag 的名称。

2.3 回滚到 Tag

用户可以将表回滚到某个 Tag 对应的快照状态。回滚操作会删除该 Tag 之后的所有快照和 Tag,并清理对应的数据文件。
语法示例:

CALL sys.rollback_to('default.T', 'my_tag')

参数说明:

  • identifier(标识符):目标表的标识符,不能为空。形如dbName.tableName格式。
  • tagName(标签名):新建 tag 的名称。

3. 时间旅行操作

Paimon 支持时间旅行(Time Travel)功能,允许用户查询历史快照或 Tag 对应的数据。

3.1 批查询

当我们选择批作业开发模式下,以下几种方法是查询指定快照的方法

3.1.1 查询最新快照数据

查询最新快照就按照经典的 Select 语句从表中读取数据,不指定 Tag 和快照 ID 等信息,就是默认从最新快照查询:

SELECT * FROM t;

3.1.2 批查询 Time Travel

Paimon 的带时间旅行的批量读取功能可以指定一个快照或者一个标签,并读取相应的数据:

-- 读取 ID 为 1 的快照
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;

-- 从指定的 Unix 毫秒时间戳读取快照
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- 从指定的时间戳字符串读取快照,会自动转换为 Unix 毫秒时间戳
-- 支持的格式包括:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS,使用默认本地时区
SELECT * FROM t /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */;

-- 从 watermark 读取快照,将匹配 watermark 之后的第一个快照
SELECT * FROM t /*+ OPTIONS('scan.watermark' = '1678883047356') */;

最常见的是从某一个 Tag 读取数据:

-- 读取标签 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

3.1.3 查询两个 Tag 之间的增量数据

读取起始快照(不包括)和结束快照之间的增量变化。例如:

  • 快照:‘5,10’ 表示快照 5 和快照 10 之间的变化。
  • Tag:‘TAG1,TAG3’ 表示 TAG1 和 TAG3 之间的变化。
-- 快照 ID 之间的增量变化
SELECT * FROM t /*+ OPTIONS('incremental-between' = 'tag1,tag3') */;

-- 快照时间戳(毫秒)之间的增量变化
SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;

默认情况下,将扫描生成变更日志文件的表的变更日志文件。否则,扫描新更改的文件。您也可以强制指定 incremental-between-scan-mode。在批处理 SQL 中,不允许返回 DELETE 记录,因此 -D 记录将被丢弃。如果您想查看 DELETE 记录,可以使用 audit_log 表:

SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;

3.2 流式查询

当我们选择流作业开发模式下,以下几种方法是查询的方法

3.2.1 流式消费 Paimon 表

默认情况下,流式读取首先为当前 Paimon 表生成快照,读取完成后,持续读取最新的变更数据。Paimon 流式模式下的 Paimon 源是无界的,就像一个永无止境的队列。

-- 首先生成消费,读取快照完成后,消费快照数据
SELECT * FROM t;

-- 从最新数据开始消费
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;

3.2.2 流式查询 Time Travel

如果当前 Paimon 表是一个 分区表,如果您只想处理今天及以后的数据,可以使用分区过滤器:

SELECT * FROM t WHERE dt > '2023-06-26';

如果不是一个分区表,或者您无法按分区过滤,可以使用时间旅行的流式读取。

-- 从快照 ID 1L 读取更改
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;

-- 从指定的时间戳读取快照更改
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- 在首次启动时读取快照 ID 1L,并继续读取更改
SELECT * FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;

时间旅行的流式读取依赖于快照,但默认情况下,快照只保留 1 小时内的数据,这可能会阻止您读取较早的增量数据。因此,Paimon 还提供了另一种流式读取模式 scan.file-creation-time-millis,它提供了一种粗略的过滤,以保留在 timeMillis 之后生成的文件。

SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;

4. 自动创建 Tag 功能

Paimon 支持在写入任务中自动创建 Tag,用户可以通过配置实现按周期自动生成 Tag。

4.1 配置自动创建 Tag

4.1.1 配置 Tag 自动创建模式

通过表选项 tag.automatic-creation 设置 Tag 的创建模式,支持以下模式:

  • process-time:基于机器时间创建 Tag。
  • watermark:基于 Sink 输入的 Watermark 创建 Tag。
  • batch:在批处理场景中,任务完成后生成 Tag。

如果选择 watermark 模式,且 Watermark 不是 UTC 时间,需配置 sink.watermark-time-zone。这个参数代表将水印值解析为 TIMESTAMP 值的时区。默认值为'UTC',这意味着水印是在 TIMESTAMP 列上定义的,或者未定义。如果水印是在 TIMESTAMP_LTZ 列上定义的,水印的时区是用户配置的时区,该值应该是用户配置的本地时区。选项值可以是全名,如'America/Los_Angeles',也可以是自定义时区 ID,如'GMT-08:00'。

4.1.2 配置 Tag 创建周期

通过 tag.creation-period 设置 Tag 的创建频率,支持以下选项:

  • daily:每天创建一次 Tag。
  • hourly:每小时创建一次 Tag。
  • two-hours:每两小时创建一次 Tag。

如果需要等待迟到数据,可以配置延迟时间 tag.creation-delay

4.1.3 配置 Tag 自动删除

通过 tag.num-retained-max 设置保留的 Tag 数量,超出数量的旧 Tag 会被自动删除。

4.1.4 配置 Tag 格式

通过 tag.period-formatter可以设置自动创建的 Tag 的格式,支持以下两种格式:

  • with_dashes:默认值,时间戳中用中划线相连,例如 'yyyy-MM-dd HH'
  • without_dashes:时间戳不用中划线连接,例如 'yyyyMMdd HH'

4.2 配置 Tag 成功标志文件

很多时候 Flink 实时写入 Paimon 数据表,需要和下游的批处理任务调度相配合。当数据新鲜度满足要求后将自动创建 Tag。创建 tag 成功之后,生成以下文件:

# 数据表为 paimon_tag.paimon_tag.tag_table
# Tag 格式为 "tag-2025-02-20 15"
tos://<bucket>/paimon_tag/paimon_tag.db/tag_table/tag/tag-2025-02-20 15

可以在下游批作业调度任务的时候,检查该文件生成,即可调度下游依赖批作业任务。

4.3 示例配置

4.3.1 按照处理时间生成 Tag

以下示例配置表每天 0:10 自动创建 Tag,并保留最近 90 天的 Tag:

CREATE TABLE IF NOT EXISTS paimon_tag.paimon_tag.tag_table (
    user_id STRING PRIMARY KEY NOT ENFORCED,
    ts TIMESTAMP(3),
    url STRING
) WITH (
    'tag.automatic-creation' = 'process-time',
    'tag.creation-period' = 'daily',
    'tag.creation-delay' = '10 m',
    'tag.num-retained-max' = '90',
    'tag.create-success-file' = 'true' -- Tag 生成时候创建 success 文件
);

4.3.2 按照事件时间水印值生成 Tag

以下示例配置表每小时处理事件时间达到 00:00 自动创建 Tag,Tag 允许 1 分钟的延迟时间:

-- 一般 Tag 在主键表场景使用较多,我们采用主键表做示例
CREATE TABLE IF NOT EXISTS paimon_tag.paimon_tag.tag_table_watermark082 (
    user_id STRING,
    ts TIMESTAMP(3),
    url STRING,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'tag.automatic-creation' = 'watermark',
    'tag.creation-period' = 'hourly',
    'tag.num-retained-max' = '30'
);

-- 模拟数据,Watermark 设置为 1 分钟延迟数据
CREATE TABLE IF NOT EXISTS datasource (
    user_id STRING,
    ts TIMESTAMP(3),
    url STRING,
    WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'datagen'
);

INSERT INTO paimon_tag.paimon_tag.tag_table_watermark082
SELECT * FROM datasource;

通过以上功能,Paimon 提供了强大的历史数据管理和查询能力,适用于需要保留和查询历史数据的场景。