Flink CDC 3.x 是 Apache Flink 的新一代 CDC 解决方案,支持高效捕获和处理数据库变更数据,实现全量与增量数据的无缝同步。本手册将指导您如何使用 Flink CDC 3.x 进行 MySQL 到 ByteHouse-CDW 的数据同步任务。
在 ByteHouse-CDW 场景,Flink CDC 3.x 以下功能:
确保您已经:
Flink 版本需 >= 1.16,Flink CDC YAML 内置支持 ByteHouse-CDW 云数仓版。
当前 Flink CDC 支持从 MySQL 自动同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):
注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。
在 Flink 资源管理 - 资源池 - 资源池详情,中查看资源池网段或者安全组。
将以上查出来的网段或者安全组加入 MySQL 白名单即可。
REPLICATION SLAVE
、REPLICATION CLIENT
用以访问 binlog。本文创建 flinkcdc 账号。为了测试的目的,可以使用以下 SQL 创建数据库:
-- create database CREATE DATABASE app_db; USE app_db; -- create orders table CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- create shipments table CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- create products table CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) );
可以通过如下 SQL 导入部分 Demo 数据:
-- insert records INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00); -- insert shipments INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian'); -- insert products INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
在此步骤操作前,请确保 ByteHouse 和 Flink 资源池 VPC 相同,并且网络可以联通(可以参考 调试网络连通性),并且设置了合理的权限,以及签发了正确的 API-KEY。
手动创建数据库表:ByteHouse-CDW 目前不支持自动建表,原因是 Bytehouse CDW 建表参数较多,所以 ByteHouse 有两种方式建表建议
注意:对于存量的 MySQL 实例,可以参考如下命令导出所有数据库建表语句
mysqldump -u <mysql_user> -p --no-data --skip-opt <target_db> > ddl.sql
操作路径:作业开发 - Flink CDC 作业 - 创建作业
参考文档:开发 Flink CDC 任务(Beta)
在 CDC 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 CDC 任务的参数配置 - 依赖文件中。
Flink CDC 具体的配置可以参考 Bytehouse CDW的参考文档。其中核心的任务配置如下:
################################################################################ # Description: Sync MySQL all tables to BH CDW ################################################################################ source: type: mysql name: MySQL Source hostname: vedbm-xxx.pri.mysql.vedb.ivolces.com port: 3306 username: your-username password: your-password tables: app_db.\.* server-id: 5401-5420 sink: type: bytehouse-cdw name: Bytehouse CDW Sink region: VOLCANO_PRIVATE host: tenant-xxxx-cn-beijing.bytehouse.ivolces.com port: 19000 virtual-warehouse: test api-token: xxx:xxx sink.buffer-flush.interval: 5s sink.buffer-flush.max-rows: 10000 timestamp-offset: -8h pipeline: name: MySQL to Bytehouse CDW Pipeline parallelism: 16
为了保证实时任务在失败的时候能够从状态恢复,需要合理的设置 Flink 的状态提交策略。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下次恢复任务会从这一次状态进行恢复。
Checkpoint 开启如下图,在 作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔 进行设置。
此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适资源池进行上线。
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。
可以进入 ByteHouse-CDW 的 SQL 工作表功能,通过 SQL 查询是否数据已经正确同步到数据库中。
查看 JMJobManager日志,开始切分 split 的关键字如下:
# 开始切分 split 2025-01-12 22:08:06,122 INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner [] - Start splitting table app_db.con_bt_point_info into chunks...
切分完之后,开始下发 snapshot split,其中 app_db.con_bt_point_info:0 表示第一个 split,根据 split 编号可以看到 split 处理的进度:
2025-01-12 22:08:42,644 INFO org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator assigns split MySqlSnapshotSplit{tableId=app_db.con_bt_point_info, splitId='app_db.con_bt_point_info:0', splitKeyType=[`point_id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} to subtask 1
查看 JobManager 日志查看分配 binlog split 的 subtask index:
2025-01-13 11:16:20,315 INFO org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator assigns split MySqlBinlogSplit{splitId='binlog-split', tables=[], offset={ts_sec=0, file=binlog.000426, pos=520322, kind=SPECIFIC, gtids=da643d88-c37e-11ef-a8e8-5254ac1c7b41:1-1521914, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, kind=NON_STOPPING, row=0, event=0}, isSuspended=false} to subtask 0
查找 subtask 0 对应的 TM 日志:
搜索 TaskManager 日志里的关键字,查看 binlog 处理进度,其中 file 和 pos 分别是读取的 binlog 文件和位置:
2025-01-13 11:21:19,039 INFO org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - Binlog offset for tables [app_db.shipments, app_db.shipments90, app_db.shipments91, app_db.shipments89, app_db.shipments81, app_db.shipments82, app_db.shipments83, app_db.shipments84, app_db.shipments85, app_db.shipments86, app_db.shipments87, app_db.shipments88, app_db.shipments12] on checkpoint 2: {transaction_id=null, ts_sec=0, file=binlog.000426, pos=687715, kind=SPECIFIC, gtids=da643d88-c37e-11ef-a8e8-5254ac1c7b41:1-1522412, row=0, event=0, server_id=2051901732}
问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
。
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。
问题现象:MySQL 的 datetime 是一个无时区信息的数据类型,很多用户会用这个类型存取本地时间。比如 UTC+8 时区的 2025-01-09 10:00:00
。因为无法确认时区,所以 Flink & ByteHouse-CDW 会在转型过程中默认使用 UTC 时间戳。所以在写入之后,使用 UTC+8 的时间再去查看这个时间结果为 2025-01-09 18:00:00
。导致与 MySQL 中时间出现差异。
解决方案:
timestamp-offset: -8h
,可以在同步任务过程中,自动将时区进行偏移,减去 8 小时。注意这个时区偏移仅对 MySQL Datetime 类型生效。