You need to enable JavaScript to run this app.
导航
Paimon 聚合表功能
最近更新时间:2025.02.18 16:04:09首次发布时间:2025.02.18 16:04:09

1. 概述

Flink Paimon 的 Aggregation 表是一种支持实时聚合计算的表类型。它允许用户在数据写入时对数据进行聚合操作,并将聚合结果直接存储到表中。这种表特别适用于需要实时计算和查询聚合结果的场景,例如用户行为分析、实时监控、实时报表等。聚合表包含以下功能:

  • 实时聚合:在数据写入时,自动对数据进行聚合计算(如求和、最大值、最小值等),并将结果存储到表中。
  • 提升查询性能:由于聚合结果已经预先计算并存储,查询时可以直接读取聚合结果,避免了实时计算的性能开销。
  • 支持回撤(Retract):部分聚合函数支持回撤操作(如 sumcount 等),可以正确处理更新和删除操作。

2. 适用场景

  • 用户行为分析:通过分析网站或 APP 的访问流量,统计用户的访问总时长、访问总次数。
  • 广告效果分析:广告厂商为广告主提供的广告点击总量、展示总量、消费统计等。
  • 电商实时报表:通过分析电商的全年交易数据,获得指定季度或者月份中,各类消费人群的爆款商品。

3. 使用示例

3.1 业务说明

我们有两张表:订单表子订单表。订单表存储了系统的基本订单信息,而子订单表通过订单 ID 与订单表关联。每个订单可能包含多个子订单,因此需要将子订单信息按照订单 ID 聚合,并将多行子订单数据收集到一个嵌套的数组(ARRAY<ROW>)中。实现逻辑如下:
Image

  1. 嵌套表的主键配置

通过 fields.<field-name>.nested-key=pk0,pk1,... 指定嵌套表的主键,这里指定为子订单的 id(如果不指定,会默认附加到子订单数组中,不会去重)。

  1. 价格聚合

订单的总价格等于所有子订单价格的总和。我们可以通过配置 'fields.sum_price.aggregate-function' = 'sum',在子订单入库时自动对价格进行累加。

3.2 业务建模

创建 Paimon 数据表之前,我们先创建 Catalog,这里我们简化测试方案,使用了基于 FileSystem 的 Catalog。也可以参考 Paimon 使用 LAS Catalog 管理元数据 使用 LAS 作为元数据 Catalog 管理方案:

CREATE CATALOG paimon_agg
WITH
  (
    'type' = 'paimon',
    'warehouse' = 'tos://<bucket-name>/paimon_agg'
  );
CREATE DATABASE IF NOT EXISTS paimon_agg.paimon_agg;

创建 Paimon 的聚合表方式如下,注意其中我们使用了日期作为分区字段,而对于 pv 和 last_login_time 进行 sumlast_non_null_value聚合。

-- 订单表
CREATE TABLE IF NOT EXISTS paimon_agg.paimon_agg.orders (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,
  user_name STRING,
  address STRING
);


-- 具有相同order_id的子订单属于同一个订单
CREATE TABLE IF NOT EXISTS paimon_agg.paimon_agg.sub_orders (
  order_id BIGINT,
  sub_order_id INT,
  product_name STRING,
  price BIGINT,
  PRIMARY KEY (order_id, sub_order_id) NOT ENFORCED
);

聚合表的建表语句可以如下 SQL 所示,其中指定了 aggregation 的合并引擎,另外指定了。

  1. 对于sub_orders字段进行了 nested_update聚合,模拟订单中的嵌套表,并且根据子订单 id sub_order_id进行去重更新。
  2. 另外对于子订单的价格进行 sum 聚合,计算订单的总价格。
-- 宽表,将子订单聚合成嵌套表,同时聚合所有子订单的 price 金额
CREATE TABLE IF NOT EXISTS paimon_agg.paimon_agg.order_wide (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,
  user_name STRING,
  address STRING,
  sub_orders ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>,
  sum_price BIGINT
) WITH (
  'merge-engine' = 'aggregation',
  'fields.sub_orders.aggregate-function' = 'nested_update',
  'fields.sub_orders.nested-key' = 'sub_order_id',
  'fields.sum_price.aggregate-function' = 'sum'
);

3.3 聚合写入

聚合写入的 SQL 如下,将多个更新流进行 UINON ALL 操作。另外不同的数据流中要保证不需要更新的字段设置为 NULL,其他字段将会进行更新或者合并:

-- 宽化
INSERT INTO paimon_agg.paimon_agg.order_wide
SELECT * FROM (
  SELECT 
    order_id, 
    user_name,
    address, 
    CAST (NULL AS ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>) ,
    CAST (NULL AS BIGINT) 
  FROM paimon_agg.paimon_agg.orders


  UNION ALL 
    
  SELECT 
    order_id, 
    CAST (NULL AS STRING), 
    CAST (NULL AS STRING), 
    ARRAY[ROW(sub_order_id, product_name, price)],
    price 
  FROM paimon_agg.paimon_agg.sub_orders
);

当数据流构建成功后,我们可以通过以下 SQL 进行测试数据写入:

-- 插入主订单数据
INSERT INTO paimon_agg.paimon_agg.orders VALUES
(1001, 'John Smith', 'No.123 Park Avenue, New York'),
(1002, 'Emma Davis', '456 Ocean Drive, Los Angeles'),
(1003, 'Michael Brown', '789 Lake Street, Chicago'),
(1004, 'Sarah Wilson', '321 Forest Road, Seattle');

-- 插入子订单数据
INSERT INTO paimon_agg.paimon_agg.sub_orders VALUES
-- 订单1001的子订单(包含3个商品)
(1001, 1, 'iPhone 15', 6999),
(1001, 2, 'AirPods Pro', 1999),
(1001, 3, 'Power Adapter', 199),
-- 订单1002的子订单(包含2个商品)
(1002, 1, 'MacBook Pro', 12999),
(1002, 2, 'Laptop Bag', 299),
-- 订单1003的子订单(包含4个商品)
(1003, 1, 'iPad Air', 4799),
(1003, 2, 'Apple Pencil', 999),
(1003, 3, 'Protective Case', 299),
(1003, 4, 'Screen Protector', 99),
-- 订单1004的子订单(包含1个商品)
(1004, 1, 'Apple Watch', 3299);

3.4 验证数据

可以使用 print 的数据下游进行输出:

-- 按照表的嵌套模式打印结果
CREATE TABLE IF NOT EXISTS print_result (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,
  user_name STRING,
  address STRING,
  sub_orders ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>,
  sum_price BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO `print_result`
SELECT * FROM `paimon_agg`.`paimon_agg`.`order_wide`;

验证数据如下表所示,可以看出子订单和订单总金额都正确地得到了聚合:
Image

4. 聚合函数

当前支持的聚合函数和数据类型如下:

  • sum:函数聚合多行的值。它支持 DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT 和 DOUBLE 数据类型。
  • product:函数可以计算多行的乘积值。它支持 DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT 和 DOUBLE 数据类型。
  • count:函数统计多行的值。它支持 INTEGER, BIGINT 数据类型。
  • max:函数识别并保留最大值。它支持 CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP 和 TIMESTAMP_LTZ 数据类型。
  • min:函数识别并保留最小值。它支持 CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP 和 TIMESTAMP_LTZ 数据类型。
  • last_value:函数用最近导入的值替换之前的值。它支持所有数据类型。
  • last_non_null_value:函数用最新的非空值替换之前的值。它支持所有数据类型。
  • listagg:函数将多个字符串值连接成一个字符串。它支持 STRING 数据类型。
  • bool_and:函数评估布尔集合中的所有值是否都为 true。它支持 BOOLEAN 数据类型。
  • bool_or:函数检查布尔集合中是否至少有一个值为 true。它支持 BOOLEAN 数据类型。
  • first_value:函数从数据集中检索第一个空值。它支持所有数据类型。
  • first_non_null_value:函数选择数据集中第一个非空值。它支持所有数据类型。
  • nested_update:函数将多行收集到一个 array中(所谓的“嵌套表”)。它支持 ARRAY数据类型。使用fields.<field-name>.nested-key=pk0,pk1,...指定嵌套表的主键。如果没有键,行将附加到 array中。
  • collect:函数将元素收集到一个数组中。您可以设置fields.<field-name>.distinct=true来去重元素。它仅支持 ARRAY 类型。
  • merge_map:函数合并输入的映射。它仅支持 MAP 类型。

对于流查询,aggregation合并引擎必须与lookupfull-compaction一起使用(input变更日志生产者也支持,但仅返回输入记录)。