You need to enable JavaScript to run this app.
导航
Table Functions
最近更新时间:2024.12.03 17:46:09首次发布时间:2023.11.07 10:27:50

Table function 的输入可以是 0、1,以及多个标量(scalar values),但是输出的是多行,也可以认为输出的是一个二维表。

如何实现

  1. pom.xml中添加flink-table-common的依赖。

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.11-byted-SNAPSHOT</version>
            <!--设置为 provided,表示依赖关系仅在编译和测试时有效。这意味着在打包的时候不会打入 jar 包。-->
            <scope>provided</scope>
    </dependency>
    
  2. 代码实现。
    通过继承org.apache.flink.table.functions.TableFunction实现,并且需要实现一个或多个 eval 函数。eval 函数输入可以是多个参数。

示例 Demo

本文提供一个简单的 table function 的示例,作用是将一个 Row array 切分为多行。

public class SplitTableFunction extends TableFunction<Row> {
  public void eval(Row[] rows) {
    for (Row row : rows) {
      // collect 一次相当于输入一行。
      collect(row);
    }
  }

  // 如果输入的参数类型是复杂类型,需要通过 getParameterTypes 指定入参的类型。
  @Override
  public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
    TypeInformation[] typeInformation = new TypeInformation[1];
    // 入参是 Row 类型的数组,Row 第一列是 int, 第二列是 String。
    typeInformation[0] = Types.OBJECT_ARRAY(Types.ROW(Types.INT, Types.STRING));
    return typeInformation;
  }

  // 如果返回的数据类型是复杂类型,需要通过 getResultType 指定返回数据的类型。
  @Override
  public TypeInformation<Row> getResultType() {
  // 返回类型是 Row,其中 第一列名称是 id, 类型是 int, 第二列名称是 name, 类型是 String。
    TypeInformation[] typeInformations = new TypeInformation[] {Types.INT, Types.STRING}; 
    String[] names = new String[] {"id", "name"};
    return Types.ROW_NAMED(names, typeInformations);
  }
}

如何使用

如何使用自定义的 Table function,请参见创建自定义函数使用自定义函数