在 Flink 控制台,您可以创建 Flink SQL 任务,通过简单的 SQL 语句表达您的业务逻辑,就能实现业务。
您在开始开发 SQL 任务前,应提前完成以下准备工作:
在顶部菜单栏选择目标地域。
在左侧导航栏选择项目管理,在搜索框中根据项目名称进行模糊搜索,然后单击项目区块进入项目。
在项目左侧导航栏选择任务开发 > Jupyter lab。
在任务开发页面单击加号按钮,创建任务。
您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击 Flink 任务下的 Flink Stream SQL。
在创建任务对话框,设置任务名称、任务类型、所属文件夹、引擎版本等参数,然后单击确定。
配置 | 说明 |
---|---|
任务名称 | 自定义设置任务的名称。 |
任务类型 | 选择 Flink 任务 > Flink Stream > SQL。 |
所属文件夹 | 从下拉列表中选择目标文件夹。 |
引擎版本 | 目前支持 Flink 1.11-volcano 、Flink 1.16-volcano、Flink 1.17-volcano 版本,请按需选择。 |
任务描述 | 输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编辑 SQL 任务的业务逻辑代码。
任务创建完后默认打开任务的代码编辑区,您可根据实际业务编写 SQL 代码。
此处提供一个测试用例,可直接使用。
CREATE TABLE datagen_source ( siteid INT, citycode SMALLINT, username STRING, pv BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '2', 'fields.username.length' = '10', 'fields.siteid.max' = '10000', 'fields.siteid.min' = '1000' ); CREATE TABLE print_sink ( siteid INT, citycode SMALLINT, username STRING, pv BIGINT ) WITH ( 'connector' = 'print', 'print-identifier' = 'out' ); insert into print_sink select * from datagen_source;
单击格式化按钮,系统自动调整SQL代码格式。
系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。
SQL 任务代码编辑完成后,单击验证按钮。
系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
。
代码编辑和验证通过后,单击保存按钮,保存任务代码。
在顶部菜单栏选择目标地域。
在左侧导航栏选择项目管理,在搜索框中根据项目名称进行模糊搜索,然后单击项目区块进入项目。
在项目左侧导航栏选择任务开发 > Jupyter lab。
在任务开发页面单击加号按钮,创建任务。
您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击 Flink 任务下的 Flink Batch SQL。
在创建任务对话框,设置任务名称、任务类型、所属文件夹、引擎版本等参数,然后单击确定。
配置 | 说明 |
---|---|
任务名称 | 自定义设置任务的名称。 |
任务类型 | 选择 Flink 任务 > Flink Batch > SQL。 |
所属文件夹 | 从下拉列表中选择目标文件夹。 |
引擎版本 | 目前支持 Flink 1.11-volcano 、Flink 1.16-volcano、Flink 1.17-volcano 版本,请按需选择。 |
任务描述 | 输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编辑 SQL 作业的业务逻辑代码。
任务创建完后默认打开任务的代码编辑区,您可根据实际业务编写 SQL 代码。
此处提供一个测试用例,读取 MySQL 已存在的表格内容,然后通过 Print 连接器打印。
create table mysql_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://mysq***fb.rds.ivolces.com:3306/gts_autotest', 'username' = '***test', 'password' = 'Pw**45!', 'table-name' = 'orders' ); create table print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'print' ); insert into print_sink select * from mysql_source;
单击格式化按钮,系统自动调整SQL代码格式。
系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。
SQL 任务代码编辑完成后,单击验证按钮。
系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
。
代码编辑和验证通过后,单击保存按钮,保存任务代码。