Paimon 的 Tag 功能允许用户基于快照创建标签,以保留历史数据。由于 Paimon 表会根据配置自动过期旧的快照,导致历史数据无法查询。通过 Tag 功能,用户可以为特定的快照创建标签,从而保留该快照对应的数据文件和元数据,以便后续查询历史数据。Tag 功能特别适用于需要按天或按小时保留历史数据的场景。
在使用之前,需要注意 Flink 版本要求:
用户可以为指定的快照创建 Tag。Tag 会保留该快照的所有数据文件和元数据,确保历史数据可查询。
语法示例:
CALL sys.create_tag('default.T', 'my_tag', 10, '1 d')
参数说明:
dbName.tableName
格式。用户可以删除不再需要的 Tag。删除 Tag 不会影响其他快照或数据文件。
语法示例:
CALL sys.delete_tag('default.T', 'my_tag')
参数说明:
dbName.tableName
格式。用户可以将表回滚到某个 Tag 对应的快照状态。回滚操作会删除该 Tag 之后的所有快照和 Tag,并清理对应的数据文件。
语法示例:
CALL sys.rollback_to('default.T', 'my_tag')
参数说明:
dbName.tableName
格式。Paimon 支持时间旅行(Time Travel)功能,允许用户查询历史快照或 Tag 对应的数据。
当我们选择批作业开发模式下,以下几种方法是查询指定快照的方法
查询最新快照就按照经典的 Select 语句从表中读取数据,不指定 Tag 和快照 ID 等信息,就是默认从最新快照查询:
SELECT * FROM t;
Paimon 的带时间旅行的批量读取功能可以指定一个快照或者一个标签,并读取相应的数据:
-- 读取 ID 为 1 的快照 SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */; -- 从指定的 Unix 毫秒时间戳读取快照 SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */; -- 从指定的时间戳字符串读取快照,会自动转换为 Unix 毫秒时间戳 -- 支持的格式包括:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS,使用默认本地时区 SELECT * FROM t /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */; -- 从 watermark 读取快照,将匹配 watermark 之后的第一个快照 SELECT * FROM t /*+ OPTIONS('scan.watermark' = '1678883047356') */;
最常见的是从某一个 Tag 读取数据:
-- 读取标签 'my-tag' SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
读取起始快照(不包括)和结束快照之间的增量变化。例如:
-- 快照 ID 之间的增量变化 SELECT * FROM t /*+ OPTIONS('incremental-between' = 'tag1,tag3') */; -- 快照时间戳(毫秒)之间的增量变化 SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;
默认情况下,将扫描生成变更日志文件的表的变更日志文件。否则,扫描新更改的文件。您也可以强制指定 incremental-between-scan-mode
。在批处理 SQL 中,不允许返回 DELETE 记录,因此 -D 记录将被丢弃。如果您想查看 DELETE 记录,可以使用 audit_log 表:
SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
当我们选择流作业开发模式下,以下几种方法是查询的方法
默认情况下,流式读取首先为当前 Paimon 表生成快照,读取完成后,持续读取最新的变更数据。Paimon 流式模式下的 Paimon 源是无界的,就像一个永无止境的队列。
-- 首先生成消费,读取快照完成后,消费快照数据 SELECT * FROM t; -- 从最新数据开始消费 SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
如果当前 Paimon 表是一个 分区表,如果您只想处理今天及以后的数据,可以使用分区过滤器:
SELECT * FROM t WHERE dt > '2023-06-26';
如果不是一个分区表,或者您无法按分区过滤,可以使用时间旅行的流式读取。
-- 从快照 ID 1L 读取更改 SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */; -- 从指定的时间戳读取快照更改 SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */; -- 在首次启动时读取快照 ID 1L,并继续读取更改 SELECT * FROM t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
时间旅行的流式读取依赖于快照,但默认情况下,快照只保留 1 小时内的数据,这可能会阻止您读取较早的增量数据。因此,Paimon 还提供了另一种流式读取模式 scan.file-creation-time-millis
,它提供了一种粗略的过滤,以保留在 timeMillis 之后生成的文件。
SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
Paimon 支持在写入任务中自动创建 Tag,用户可以通过配置实现按周期自动生成 Tag。
通过表选项 tag.automatic-creation
设置 Tag 的创建模式,支持以下模式:
process-time
:基于机器时间创建 Tag。watermark
:基于 Sink 输入的 Watermark 创建 Tag。batch
:在批处理场景中,任务完成后生成 Tag。如果选择 watermark
模式,且 Watermark 不是 UTC 时间,需配置 sink.watermark-time-zone
。这个参数代表将水印值解析为 TIMESTAMP 值的时区。默认值为'UTC',这意味着水印是在 TIMESTAMP 列上定义的,或者未定义。如果水印是在 TIMESTAMP_LTZ 列上定义的,水印的时区是用户配置的时区,该值应该是用户配置的本地时区。选项值可以是全名,如'America/Los_Angeles',也可以是自定义时区 ID,如'GMT-08:00'。
通过 tag.creation-period
设置 Tag 的创建频率,支持以下选项:
daily
:每天创建一次 Tag。hourly
:每小时创建一次 Tag。two-hours
:每两小时创建一次 Tag。如果需要等待迟到数据,可以配置延迟时间 tag.creation-delay
。
通过 tag.num-retained-max
设置保留的 Tag 数量,超出数量的旧 Tag 会被自动删除。
通过 tag.period-formatter
可以设置自动创建的 Tag 的格式,支持以下两种格式:
with_dashes
:默认值,时间戳中用中划线相连,例如 'yyyy-MM-dd HH'without_dashes
:时间戳不用中划线连接,例如 'yyyyMMdd HH'很多时候 Flink 实时写入 Paimon 数据表,需要和下游的批处理任务调度相配合。当数据新鲜度满足要求后将自动创建 Tag。创建 tag 成功之后,生成以下文件:
# 数据表为 paimon_tag.paimon_tag.tag_table # Tag 格式为 "tag-2025-02-20 15" tos://<bucket>/paimon_tag/paimon_tag.db/tag_table/tag/tag-2025-02-20 15
可以在下游批作业调度任务的时候,检查该文件生成,即可调度下游依赖批作业任务。
以下示例配置表每天 0:10 自动创建 Tag,并保留最近 90 天的 Tag:
CREATE TABLE IF NOT EXISTS paimon_tag.paimon_tag.tag_table ( user_id STRING PRIMARY KEY NOT ENFORCED, ts TIMESTAMP(3), url STRING ) WITH ( 'tag.automatic-creation' = 'process-time', 'tag.creation-period' = 'daily', 'tag.creation-delay' = '10 m', 'tag.num-retained-max' = '90', 'tag.create-success-file' = 'true' -- Tag 生成时候创建 success 文件 );
以下示例配置表每小时处理事件时间达到 00:00 自动创建 Tag,Tag 允许 1 分钟的延迟时间:
-- 一般 Tag 在主键表场景使用较多,我们采用主键表做示例 CREATE TABLE IF NOT EXISTS paimon_tag.paimon_tag.tag_table_watermark082 ( user_id STRING, ts TIMESTAMP(3), url STRING, PRIMARY KEY (`user_id`) NOT ENFORCED ) WITH ( 'tag.automatic-creation' = 'watermark', 'tag.creation-period' = 'hourly', 'tag.num-retained-max' = '30' ); -- 模拟数据,Watermark 设置为 1 分钟延迟数据 CREATE TABLE IF NOT EXISTS datasource ( user_id STRING, ts TIMESTAMP(3), url STRING, WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE ) WITH ( 'connector' = 'datagen' ); INSERT INTO paimon_tag.paimon_tag.tag_table_watermark082 SELECT * FROM datasource;
通过以上功能,Paimon 提供了强大的历史数据管理和查询能力,适用于需要保留和查询历史数据的场景。