You need to enable JavaScript to run this app.
导航
使用 JDBC 或者 MySQL-CDC 数据源
最近更新时间:2024.10.14 14:18:58首次发布时间:2024.10.14 14:18:58

MySQL 是一种广泛使用的开源关系型数据库管理系统,支持高效的数据存储和检索,常用于Web应用和数据分析场景。Flink 提供了两种常见的 MySQL 数据访问方式。首先,Flink JDBC Connector 支持通过 JDBC 连接 MySQL 数据库,实现批量或小规模数据的查询与结果写入。相对的,MySQL CDC (Change Data Capture) Connector 可以捕获 MySQL 数据库中的实时变更,并将其作为流数据输入到 Flink,适用于实时数据同步和更复杂分析场景。

前提条件
  1. 为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 MySQL 和 Flink 资源池均处于相同地域的同一个 VPC 内。您可以在创建云资源前,先创建私有网络。相关文档,请参见创建私有网络创建子网
  • 基于合规要求,如果需要使用 MySQL 相关功能,需要用户自行上传 MySQL 驱动 。请前往 MySQL 官网 下载8.0.27 版本,并且上传到作业开发依赖文件中。

步骤一:下载 MySQL 驱动程序
  1. 前往 MySQL 官网 ,选择 MySQL 驱动版本 8.0.27,选择操作系统为 “Platform Independent”。

Image

  1. 选择 TAR 包或者 ZIP 包,点击 Download 按钮进行下载。对文件进行解压,找到 mysql-connector-java-8.0.27.jar 文件。
  2. 将解压后的 JAR 包上传到文件管理模块

Image

步骤二:开发 SQL 任务
  1. 登录流式计算 Flink 版控制台

  2. 在顶部菜单栏选择目标地域。

  3. 在左侧导航栏选择项目管理,然后单击项目区块进入项目。

  4. 任务开发页面单击加号按钮,创建任务。
    您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击 Flink 任务下的 Flink Stream SQL

  5. 创建任务对话框,设置任务名称、任务类型、所属文件夹、引擎版本等参数,然后单击确定
    Image

    配置

    说明

    任务名称

    自定义设置任务的名称。
    名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

    任务类型

    选择 Flink 任务 > Flink Stream > SQL
    如需体验 JAR 任务的开发流程,请参见开发JAR 作业

    所属文件夹

    系统提供文件夹管理功能,用于分类管理任务。在体验任务开发过程中,您可以直接选择系统默认存在的数据开发文件夹
    如果您有自建文件夹管理任务的要求,请单击创建文件夹的文件夹按钮,然后创建文件夹。

    引擎版本

    目前支持 Flink 1.11-volcano 、Flink 1.16-volcano、Flink 1.17-volcano 版本,请按需选择。

    任务描述

    输入任务的描述语句,一般描述任务实现的功能。

  6. 在任务编辑区编辑 SQL 任务的业务逻辑代码。
    此处提供一个示例 SQL 任务代码。代码含义为:新建一个 *mysql-cdc* 源表,获取 MySQL 的实时变更数据,并将变更数据写入下游 Kafka。

    create table orders_source (
      _id varchar,
      order_id bigint,
      order_product_id bigint,
      order_customer_id bigint,
      order_status varchar,
      order_update_time timestamp(3),
      PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = 'localhost',
     'port' = '3306',
     'username' = 'root',
     'password' = '*******',
     'database-name' = 'mydb',
     'table-name' = 'orders'
    );
    
    
    create table kafka_sink (
      _id varchar,
      order_id bigint,
      order_product_id bigint,
      order_customer_id bigint,
      order_status varchar,
      order_update_time timestamp,
      PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'topic-test',
      'properties.bootstrap.servers' = 'xxx',
      'properties.group.id' = 'xxx',
      'properties.enable.idempotence' = 'false',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    
    
    INSERT INTO kafka_sink
    SELECT
      *
    FROM
      orders_source;
    
  7. 单击格式化按钮,系统自动调整SQL代码格式。
    系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。

  8. SQL 任务代码编辑完成后,单击验证按钮。
    系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
    Image

  9. 代码编辑和验证通过后,单击保存按钮,保存任务代码。

步骤三:上传 MySQL 驱动程序
  1. 点击编辑器右侧的参数配置按钮,在依赖文件中选择“步骤一” 中上传的 JAR 资源

Image

上传成功后,可以参照正常流程进行 SQL 调试、作业上线、启动等功能。

注意事项

如果上述 MySQL 驱动程序没有上传的话,会遇到如下的错误,关键错误信息如下所示

Caused by: java.lang.IllegalArgumentException: JDBC-Class not found. - com.mysql.jdbc.Driver

具体截图如下:
Image
处理方法:

  1. 检查 MySQL 驱动是否上传成功,并且附加到 SQL 的依赖文件中
  2. 确认步骤一到步骤三都按照文档操作完成后,重启任务就能解决问题