Aggregation function 是将一行或多行数据聚合为一个标量值。
在pom.xml
中添加flink-table-common
的依赖。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11-byted-SNAPSHOT</version> <!--设置为 provided,表示依赖关系仅在编译和测试时有效。这意味着在打包的时候不会打入 jar 包。--> <scope>provided</scope> </dependency>
代码实现。
通过继承org.apache.flink.table.functions.AggregateFunction
实现,必需实现以下三个方法:
说明
创建 accumulator 时,所用的比较器 comparator 不能为 lambda 表达式,最好将 comparator 放在类的成员变量中,因为构造出的对象无法序列化,会导致 Flink 任务失败。
本文提供一个简单的 Aggregation function 的示例,作用是统计 sum 和 count。
/** * Accumulator for WeightedAvg. */ public static class WeightedAvgAccum { public long sum = 0; public int count = 0; } /** * Weighted Average user-defined aggregate function. */ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> { // 初始化累加器。 @Override public WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); } // 获取结果。 @Override public Long getValue(WeightedAvgAccum acc) { if (acc.count == 0) { return null; } else { return acc.sum / acc.count; } } // 指明返回值类型,这个例子中返回的是 Long 类型,其实可以不写这个方法,Flink 能自动推断。 // 此处为了说明该方法的使用,因此加上了。 // 1. 对于基础类型或 POJO 类型,比如 Integer, Long, String 等类型,其实不需要写这个方法。 // 2. 对于复杂类型,比如 array, map, row 等类型,必须写这个方法。 // 3. getAccumulatorType 方法也类似,但是是返回 Accumulator 的类型。 @Override public TypeInformation<Row[]> getResultType() { // 本例中是 Long。 return Types.LONG; // 以下是几个例子: // 1. Map<String, Integer> // Types.MAP(Types.STRING, Types.INT) // 2. String[], Object 类型的数组 // Types.OBJECT_ARRAY(Types.STRING) // 3. int[] 或 Integer[], 原始类型数组 // Types.PRIMITIVE_ARRAY(Types.INT); // 4. Row 类型, 比如有两列,name varchar, id int. // Types.ROW_NAMED(new String[]{"name", "id"}, new TypeInformation[]{Types.STRING, Types.INT}) // 5. 更多常见的类型见 org.apache.flink.api.common.typeinfo.Types } // 累加某条消息 public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) { acc.sum += iValue * iWeight; acc.count += iWeight; } // 撤回某条消息 public void retract(WeightedAvgAccum acc, long iValue, int iWeight) { acc.sum -= iValue * iWeight; acc.count -= iWeight; } // 多累加器合并,例如 session window 就会用到 public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) { Iterator<WeightedAvgAccum> iter = it.iterator(); while (iter.hasNext()) { WeightedAvgAccum a = iter.next(); acc.count += a.count; acc.sum += a.sum; } } // 重置累加器 public void resetAccumulator(WeightedAvgAccum acc) { acc.count = 0; acc.sum = 0L; } }