You need to enable JavaScript to run this app.
导航
Aggregation Functions
最近更新时间:2024.12.03 17:46:09首次发布时间:2023.11.07 10:27:56

Aggregation function 是将一行或多行数据聚合为一个标量值。

如何实现

  1. pom.xml中添加flink-table-common的依赖。

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.11-byted-SNAPSHOT</version>
            <!--设置为 provided,表示依赖关系仅在编译和测试时有效。这意味着在打包的时候不会打入 jar 包。-->
            <scope>provided</scope>
    </dependency>
    
  2. 代码实现。
    通过继承org.apache.flink.table.functions.AggregateFunction实现,必需实现以下三个方法:

    • createAccumulator():生成一个累加器,用来存放聚合状态。

      说明

      创建 accumulator 时,所用的比较器 comparator 不能为 lambda 表达式,最好将 comparator 放在类的成员变量中,因为构造出的对象无法序列化,会导致 Flink 任务失败。

    • accumulate():为每个输入行调用 accumulate() 方法来更新累加器。
    • getValue():处理完所有行后,将调用 getValue() 方法来计算并返回最终结果。
      除以上三个必需的办法外,以下几种方法只在某些情况需要实现。
    • retract():对于有界的 OVER 窗口是必需的。
    • merge():对于很多批处理聚合和会话窗口聚合是必需的。
    • resetAccumulator():对于很多批处理聚合是必需的。
    • getResultType():如果结果类型不是基本类型或简单的 POJO 对象,则是必需的。
    • getAccumulatorType():如果累加器类型不是基本类型或简单的 POJO 对象,则是必需的。

示例 Demo

本文提供一个简单的 Aggregation function 的示例,作用是统计 sum 和 count。

/**
 * Accumulator for WeightedAvg.
 */
public static class WeightedAvgAccum {
    public long sum = 0;
    public int count = 0;
}

/**
 * Weighted Average user-defined aggregate function.
 */
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {

    // 初始化累加器。
    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }

    // 获取结果。
    @Override
    public Long getValue(WeightedAvgAccum acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return acc.sum / acc.count;
        }
    }
    
    // 指明返回值类型,这个例子中返回的是 Long 类型,其实可以不写这个方法,Flink 能自动推断。
    // 此处为了说明该方法的使用,因此加上了。
    // 1. 对于基础类型或 POJO 类型,比如 Integer, Long, String 等类型,其实不需要写这个方法。
    // 2. 对于复杂类型,比如 array, map, row 等类型,必须写这个方法。
    // 3. getAccumulatorType 方法也类似,但是是返回 Accumulator 的类型。
    @Override
        public TypeInformation<Row[]> getResultType() {
            // 本例中是 Long。
                return Types.LONG;
        
        // 以下是几个例子:
        // 1. Map<String, Integer>
        // Types.MAP(Types.STRING, Types.INT)
        // 2. String[], Object 类型的数组
        // Types.OBJECT_ARRAY(Types.STRING)
        // 3. int[] 或 Integer[], 原始类型数组
        // Types.PRIMITIVE_ARRAY(Types.INT);
        // 4. Row 类型, 比如有两列,name varchar, id int.
        // Types.ROW_NAMED(new String[]{"name", "id"}, new TypeInformation[]{Types.STRING, Types.INT})
        // 5. 更多常见的类型见  org.apache.flink.api.common.typeinfo.Types
        }

        // 累加某条消息
    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }

        // 撤回某条消息
    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }
    
    // 多累加器合并,例如 session window 就会用到
    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
        Iterator<WeightedAvgAccum> iter = it.iterator();
        while (iter.hasNext()) {
            WeightedAvgAccum a = iter.next();
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }
    
    // 重置累加器
    public void resetAccumulator(WeightedAvgAccum acc) {
        acc.count = 0;
        acc.sum = 0L;
    }
}

如何使用

如何使用自定义的 Aggregation function,请参见创建自定义函数使用自定义函数