当您创建了变量时,您可以在 SQL 任务中创建默认的临时表或指定 InMemoryCatalog 创建表时,在 WITH 参数中直接引用变量。
本文用一个简单 Demo,来演示变量的使用方式。
指定域变量优先级高于全局变量。
本文模拟一个读取 MySQL 数据的场景,希望将 MySQL 的用户名和密码以变量的形式呈现,不以明文展示。请提前完成变量的创建,具体操作请参见创建变量。
示例中 MySQL 的用户名和密码以变量的形式呈现。在 SQL 任务中引用变量时格式为${secret_values.variable_name}
,其中secret_values.
为固定前缀,variable_name
为您创建的变量名称。
${secret_values.username}
:引用全局变量 username 变量。${secret_values.password}
:引用全局变量 password 变量。${secret_values.
tablename
}
:引用指定域变量 *tablename *变量。注意
引用指定域变量时,需要在 Flink 参数中选择正确的变量作用域。如果指定的作用域和全局变量中都没有找到引用的变量,任务运行时将报错。
CREATE TEMPORARY TABLE mysql_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:30294/doc_test', 'username' = '${secret_values.username}', //引用全局变量-username变量。 'password' = '${secret_values.password}', //引用全局变量-password变量。 'table-name' = '${secret_values.tablename}' //引用指定域变量-tablename变量。 ); create table print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ('connector' = 'print'); insert into print_sink select * from mysql_source;