MySQL 是一种广泛使用的开源关系型数据库管理系统,支持高效的数据存储和检索,常用于Web应用和数据分析场景。Flink 提供了两种常见的 MySQL 数据访问方式。首先,Flink JDBC Connector 支持通过 JDBC 连接 MySQL 数据库,实现批量或小规模数据的查询与结果写入。相对的,MySQL CDC (Change Data Capture) Connector 可以捕获 MySQL 数据库中的实时变更,并将其作为流数据输入到 Flink,适用于实时数据同步和更复杂分析场景。
在顶部菜单栏选择目标地域。
在左侧导航栏选择项目管理,然后单击项目区块进入项目。
在任务开发页面单击加号按钮,创建任务。
您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击 Flink 任务下的 Flink Stream SQL。
在创建任务对话框,设置任务名称、任务类型、所属文件夹、引擎版本等参数,然后单击确定。
配置 | 说明 |
---|---|
任务名称 | 自定义设置任务的名称。 |
任务类型 | 选择 Flink 任务 > Flink Stream > SQL。 |
所属文件夹 | 系统提供文件夹管理功能,用于分类管理任务。在体验任务开发过程中,您可以直接选择系统默认存在的数据开发文件夹。 |
引擎版本 | 目前支持 Flink 1.11-volcano 、Flink 1.16-volcano、Flink 1.17-volcano 版本,请按需选择。 |
任务描述 | 输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编辑 SQL 任务的业务逻辑代码。
此处提供一个示例 SQL 任务代码。代码含义为:新建一个 *mysql-cdc*
源表,获取 MySQL 的实时变更数据,并将变更数据写入下游 Kafka。
create table orders_source ( _id varchar, order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp(3), PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '*******', 'database-name' = 'mydb', 'table-name' = 'orders' ); create table kafka_sink ( _id varchar, order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'topic-test', 'properties.bootstrap.servers' = 'xxx', 'properties.group.id' = 'xxx', 'properties.enable.idempotence' = 'false', 'key.format' = 'json', 'value.format' = 'json' ); INSERT INTO kafka_sink SELECT * FROM orders_source;
单击格式化按钮,系统自动调整SQL代码格式。
系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。
SQL 任务代码编辑完成后,单击验证按钮。
系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
。
代码编辑和验证通过后,单击保存按钮,保存任务代码。
上传成功后,可以参照正常流程进行 SQL 调试、作业上线、启动等功能。
如果上述 MySQL 驱动程序没有上传的话,会遇到如下的错误,关键错误信息如下所示
Caused by: java.lang.IllegalArgumentException: JDBC-Class not found. - com.mysql.jdbc.Driver
具体截图如下:
处理方法: