目前自定义函数主要使用于 SQL 类型任务,使用时您无需声明自定义函数,可以直接使用。
假设 Flink SQL 任务代码如下:
create table orders ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '100', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '100' ); create table print_sink ( new_order_status varchar ) WITH ( 'connector' = 'print' ); insert into print_sink select hive.udf_db.flinkudf_test(order_status) from orders where order_id between 1 and 10000;
hive
。