You need to enable JavaScript to run this app.
导航
使用自定义函数
最近更新时间:2024.12.03 17:46:08首次发布时间:2023.07.21 17:25:17

目前自定义函数主要使用于 SQL 类型任务,使用时您无需声明自定义函数,可以直接使用。

假设 Flink SQL 任务代码如下:

create table orders (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time as localtimestamp
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='1',
 'fields.order_status.length' = '3',
 'fields.order_id.min' = '1',
 'fields.order_id.max' = '100',
 'fields.order_product_id.min' = '1',
 'fields.order_product_id.max' = '100',
 'fields.order_customer_id.min' = '1',
 'fields.order_customer_id.max' = '100'
);


create table print_sink (
    new_order_status varchar
) WITH (
  'connector' = 'print'
);


insert into print_sink
select hive.udf_db.flinkudf_test(order_status) from orders  
where order_id between 1 and 10000;
  • hive:即 catalog_name,目前仅支持 Hive Catalog,固定值为hive
  • udf_db:自定义函数所属数据库。
  • flinkudf_test:自定义函数名称。