You need to enable JavaScript to run this app.
导航
创建 Flink SQL 任务
最近更新时间:2024.04.01 15:26:07首次发布时间:2022.11.11 17:16:16

在 Flink 控制台,您可以创建 Flink SQL 任务,通过简单的 SQL 语句表达您的业务逻辑,就能实现业务。

前提条件

您在开始开发 SQL 任务前,应提前完成以下准备工作:

  • 项目负责人或项目管理员(Project_Admin)已经在项目内创建好 Flink 资源池,请参见创建通用-VCI 资源池
  • 一般是由开发人员(Project_Dev) 负责开发任务,请确保操作员已被添加为项目成员并为其关联角色,请参见添加项目成员

  1. 登录流式计算 Flink 版控制台

  2. 在顶部菜单栏选择目标地域。

  3. 在左侧导航栏选择项目管理,在搜索框中根据项目名称进行模糊搜索,然后单击项目区块进入项目。

  4. 在项目左侧导航栏选择任务开发 > Jupyter lab

  5. 任务开发页面单击加号按钮,创建任务。
    您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击 Flink 任务下的 Flink Stream SQL

  6. 创建任务对话框,设置任务名称、任务类型、所属文件夹、引擎版本等参数,然后单击确定
    图片

    配置

    说明

    任务名称

    自定义设置任务的名称。
    名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

    任务类型

    选择 Flink 任务 > Flink Stream > SQL

    所属文件夹

    从下拉列表中选择目标文件夹。
    系统默认存在一个数据开发文件夹,但为了更方便的管理任务,您可以自由创建文件夹。
    如何创建文件夹,请参见管理任务文件夹

    引擎版本

    目前支持 Flink 1.11-volcano 、Flink 1.16-volcano、Flink 1.17-volcano 版本,请按需选择。

    任务描述

    输入任务的描述语句,一般描述任务实现的功能。

  7. 在任务编辑区编辑 SQL 任务的业务逻辑代码。
    任务创建完后默认打开任务的代码编辑区,您可根据实际业务编写 SQL 代码。
    此处提供一个测试用例,可直接使用。

    CREATE TABLE datagen_source (
         siteid INT,
         citycode SMALLINT,
         username STRING,
         pv BIGINT
         )
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '2',                
      'fields.username.length' = '10',            
      'fields.siteid.max' = '10000',   
      'fields.siteid.min' = '1000'        
    );
    CREATE TABLE print_sink (
        siteid INT,
        citycode SMALLINT,
        username STRING,
        pv BIGINT
        )
    WITH (
         'connector' = 'print',
         'print-identifier' = 'out'            
    );
    insert into print_sink
    select * from datagen_source;
    
  8. 单击格式化按钮,系统自动调整SQL代码格式。
    系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。

  9. SQL 任务代码编辑完成后,单击验证按钮。
    系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
    图片

  10. 代码编辑和验证通过后,单击保存按钮,保存任务代码。

  1. 登录流式计算 Flink 版控制台

  2. 在顶部菜单栏选择目标地域。

  3. 在左侧导航栏选择项目管理,在搜索框中根据项目名称进行模糊搜索,然后单击项目区块进入项目。

  4. 在项目左侧导航栏选择任务开发 > Jupyter lab

  5. 任务开发页面单击加号按钮,创建任务。
    您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击 Flink 任务下的 Flink Batch SQL

  6. 创建任务对话框,设置任务名称、任务类型、所属文件夹、引擎版本等参数,然后单击确定
    图片

    配置

    说明

    任务名称

    自定义设置任务的名称。
    名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

    任务类型

    选择 Flink 任务 > Flink Batch > SQL

    所属文件夹

    从下拉列表中选择目标文件夹。
    系统默认存在一个数据开发文件夹,但为了更方便的管理任务,您可以自由创建文件夹。如何创建任务文件夹,请参见管理任务文件夹

    引擎版本

    目前支持 Flink 1.11-volcano 、Flink 1.16-volcano、Flink 1.17-volcano 版本,请按需选择。

    任务描述

    输入任务的描述语句,一般描述任务实现的功能。

  7. 在任务编辑区编辑 SQL 作业的业务逻辑代码。
    任务创建完后默认打开任务的代码编辑区,您可根据实际业务编写 SQL 代码。
    此处提供一个测试用例,读取 MySQL 已存在的表格内容,然后通过 Print 连接器打印。

    create table mysql_source (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
     'connector' = 'jdbc',
     'url'='jdbc:mysql://mysq***fb.rds.ivolces.com:3306/gts_autotest',
     'username' = '***test',
     'password' = 'Pw**45!',
     'table-name' = 'orders'
    );
    
    create table print_sink (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp
    ) WITH (
      'connector' = 'print'
    );
    
    insert into print_sink
    select * from mysql_source;
    
  8. 单击格式化按钮,系统自动调整SQL代码格式。
    系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。

  9. SQL 任务代码编辑完成后,单击验证按钮。
    系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
    图片

  10. 代码编辑和验证通过后,单击保存按钮,保存任务代码。