You need to enable JavaScript to run this app.
导航
Paimon 使用 Flink CDC 同步 MySQL 数据
最近更新时间:2025.02.10 10:49:01首次发布时间:2025.02.10 10:49:01

1. 概述

Flink CDC 3 是 Apache Flink 的新一代 CDC 解决方案,支持高效捕获和处理数据库变更数据,实现全量与增量数据的无缝同步。本手册将指导您如何使用 Flink CDC 3.x 通过基于 FileSystem 的元数据方案,进行 MySQL 到 Paimon 的数据同步任务。

在 Paimon 场景,Flink CDC 3.x 支持以下功能:

  • 自动建表(仅支持主键表)
  • 表结构变更同步
  • 数据实时同步
  • 同时支持基于文件系统和 LAS 的元数据管理

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink CDC 任务。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。

Flink 版本需 >= 1.16,Flink CDC YAML 内置支持 Paimon 0.8.2 字节加强版本。

Image

3. MySQL 设置

3.1 基础设置

当前 Flink CDC 支持从 MySQL 自动同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):

  1. Flink CDC 支持 MySQL 8.0+ 版本,5.7+ 版本暂不支持
  2. MySQL 和 Flink 网络需要联通,建议MySQL 和 Flink 使用同一个 VPC 和子网,并且在 MySQL 为 Flink 的安全组设置白名单。参考创建白名单绑定白名单到实例

注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。

Image

  1. 另外需要使用 MySQL 创建访问用户,并且赋予用户只读权限以及 REPLICATION SLAVEREPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。

Image

3.2 MySQL 样例数据

为了测试的目的,可以使用以下 SQL 创建数据库,并且导入部分 Demo 数据:

-- create database
CREATE DATABASE app_db;

USE app_db;

-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

4. 配置 CDC 同步任务

操作路径:作业开发 - Flink CDC 作业 - 创建作业
参考文档开发 Flink CDC 任务(Beta)

注意:如果选择 LAS Catalog 作为元数据管理方案,需要选择 Paimon-LAS 的数据下游类型。并且需要在 LAS Catalog 提前完成数据目录创建和授权工作。

4.2 上传 MySQL Driver 文件

在 CDC 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 CDC 任务的参数配置 - 依赖文件中。
Image

4.3 使用 FileSystem 的 Catalog

Flink CDC 具体的配置可以参考 Paimon 的参考文档。其中核心的任务配置如下:

source:
  type: mysql
  name: MySQL Source
  hostname: mysqlxxxxxxxxxxxx.rds.ivolces.com
  port: 3306
  # 数据库访问账号密码,需要确保有足够的访问权限(只读+binlog)
  username: flinkcdc
  password: password
  tables: app_db.\.*
  # MySQL 同步任务 id ,建议每个 CDC 任务设置不同的范围区间
  server-id: 5401-5500
  # 数据库服务器中的会话时区,例如 "Asia/Shanghai"。它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。
  # 如果未设置,则使用 ZoneId.systemDefault() 来确定服务器时区。
  server-time-zone: Asia/Shanghai
   
sink:
  type: paimon
  name: Paimon Sink
  commit.user: flinkcdc
  catalog.properties.metastore: filesystem
  # 表的分桶个数,当前仅支持统一设置
  table.properties.bucket: 1
  catalog.properties.warehouse: tos://<bucket_name>/<catalog_name>

pipeline:
  name: MySQL to Paimon Pipeline
  parallelism: 2
  • <catalog_name>:Catalog 的名称,自定义。
  • <bucket_name>:存储 Paimon 数据的 TOS(对象存储)桶名称。

4.4 使用 LAS 的 Catalog

Flink CDC 具体的配置可以参考 Paimon 的参考文档。其中核心的任务配置如下:

source:
  type: mysql
  name: MySQL Source
  hostname: mysqlxxxxxxxxxxxx.rds.ivolces.com
  port: 3306
  # 数据库访问账号密码,需要确保有足够的访问权限(只读+binlog)
  username: flinkcdc
  password: password
  tables: app_db.\.*
  # MySQL 同步任务 id ,建议每个 CDC 任务设置不同的范围区间
  server-id: 5401-5500
  # 数据库服务器中的会话时区,例如 "Asia/Shanghai"。它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。
  # 如果未设置,则使用 ZoneId.systemDefault() 来确定服务器时区。
  server-time-zone: Asia/Shanghai

sink:
  type: paimon-las
  name: Paimon LAS Sink
  commit.user: flinkcdc
  catalog.properties.metastore: hive
  catalog.properties.uri: thrift://lakeformation.las.cn-beijing.ivolces.com:48869
  catalog.properties.warehouse: tos://<bucket_name>/<catalog_name>
  catalog.properties.hive-conf-dir: /opt/tiger/workdir
  # 表的分桶个数,当前仅支持统一设置
  table.properties.bucket: 1

pipeline:
  name: MySQL to Paimon LAS Pipeline
  parallelism: 2

注意:使用 Flink CDC 同步数据到 Paimon,元数据使用 LAS 的过程同样要参考 Paimon 使用 LAS Catalog 管理元数据文档中,需要生成 AK/SK、 LAS 开通权限、导入 LAS 依赖文件等过程。

4.5 开启 Checkpoint

Paimon 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。

注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。

Checkpoint 开启如下图,在 作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔 进行设置。
Image

5. 上线任务

5.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适资源池进行上线。

Image

上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

5.2 确认任务执行成功

可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。

Image

6. 数据读取示例

以下示例展示了如何使用 Flink SQL 从 Paimon 表中流/批式读取数据。

6.1 创建打印表

创建一个打印表,用于输出读取的数据。

CREATE TABLE `print_orders` (
  `id` INT,
  `price` DECIMAL(10,2),
) WITH (
  'connector' = 'print'
);
  • connector:使用 print 连接器将数据打印到控制台。

6.2 读取 Paimon 表数据

从 Paimon 表中读取数据并写入打印表。

CREATE CATALOG paimon_cdc_test
WITH
  (
    'type' = 'paimon',
    'warehouse' = 'tos://<bucket_name>/paimon_cdc_test'
  );


CREATE DATABASE IF NOT EXISTS paimon_cdc_test.app_db;


INSERT INTO `print_orders`
SELECT * FROM `paimon_cdc_test`.`app_db`.`orders`;
  • paimon_cdc_test:Catalog 名称。
  • app_db:Database 名称。
  • orders:源表名称。

7 任务阶段查询

7.1 确认任务处于全量阶段同步进度:

查看 JMJobManager日志,开始切分 split 的关键字如下:

# 开始切分 split
2025-01-12 22:08:06,122 INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner [] - Start splitting table app_db.con_bt_point_info into chunks...

切分完之后,开始下发 snapshot split,其中 app_db.con_bt_point_info:0 表示第一个 split,根据 split 编号可以看到 split 处理的进度:

2025-01-12 22:08:42,644 INFO  org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator assigns split MySqlSnapshotSplit{tableId=app_db.con_bt_point_info, splitId='app_db.con_bt_point_info:0', splitKeyType=[`point_id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} to subtask 1

7.2 确认任务处于增量阶段同步进度:

查看 JobManager 日志查看分配 binlog split 的 subtask index:

2025-01-13 11:16:20,315 INFO  org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator assigns split MySqlBinlogSplit{splitId='binlog-split', tables=[], offset={ts_sec=0, file=binlog.000426, pos=520322, kind=SPECIFIC, gtids=da643d88-c37e-11ef-a8e8-5254ac1c7b41:1-1521914, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, kind=NON_STOPPING, row=0, event=0}, isSuspended=false} to subtask 0

查找 subtask 0 对应的 TM 日志:

搜索 TaskManager 日志里的关键字,查看 binlog 处理进度,其中 file 和 pos 分别是读取的 binlog 文件和位置:

2025-01-13 11:21:19,039 INFO  org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - Binlog offset for tables [app_db.shipments, app_db.shipments90, app_db.shipments91, app_db.shipments89, app_db.shipments81, app_db.shipments82, app_db.shipments83, app_db.shipments84, app_db.shipments85, app_db.shipments86, app_db.shipments87, app_db.shipments88, app_db.shipments12] on checkpoint 2: {transaction_id=null, ts_sec=0, file=binlog.000426, pos=687715, kind=SPECIFIC, gtids=da643d88-c37e-11ef-a8e8-5254ac1c7b41:1-1522412, row=0, event=0, server_id=2051901732}

8. 常见问题

8.1 JDBC Driver 找不到错误

问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

Image
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。

Image

问题现象:在 Flink CDC 任务运行一段时间后,因为各种原因需要丢弃原来状态从全新的初始阶段开始同步。如果此时原来 Paimon 表没有删除,则会出现新数据无法写入的问题。
解决方案:这个是因为 Flink CDC Paimon 的数据下游存在缺陷,commit.user 固定不变,导致在 Paimon 写入的时候快照从 0 版本开始,因为落后当前 Paimon 表的版本,则会导致始终无法发布。有两种解决方案:

  1. 在全新启动的时候,需要删除 TOS 上所有的 Paimon 文件,保证全量同步从头开始。
  2. 在全新启动的时候,需要将 commit.user 变更成新的值,比如 commit.user: v2,可以保证当前新的 commit user 快照可以正常发布。

Image

8.3 MySQL Datetime 和 Paimon Timestamp 类型时间有差异

问题现象:MySQL 的 datetime 是一个无时区信息的数据类型,很多用户会用这个类型存取本地时间。比如 UTC+8 时区的 2025-01-09 10:00:00。因为无法确认时区,所以 Flink & Paimon 会在转型过程中默认使用 UTC 时间戳。所以在写入 Paimon 之后,使用 UTC+8 的时间再去查看这个时间结果为 2025-01-09 18:00:00。导致与 MySQL 中时间出现差异。
解决方案:

  1. 在 MySQL 中避免使用 datetime 类型字段,而采用有时区信息的 timestamp 类型字段。这样 Flink 会结合 server-time-zone 信息,进行时区转换
  2. 如果 MySQL 无法避免使用 datetime 类型,可以在 MySQL datetime 类型字段中用 UTC 时间存储,比如 UTC+8 的 2025-01-09 10:00:00,在数据库中存储为 2025-01-09 02:00:00

8.4 CDC 任务启动失败,提示动态 Bucket 模式不支持

问题现象:Flink CDC 启动失败,报错包含如下关键信息Can't extract bucket from row in dynamic bucket mode如下图所示:
Image
解决方案:当前 Flink CDC 版本暂不支持 Paimon 的动态分桶模式(bucket = -1)。所以需要在 Paimon Sink 中指定如下固定分桶的参数。

# 指定建表的时候指定的分桶数量,建议按照数据量进行合理设置
table.properties.bucket: 10

8.5 changelog-producer 应该如何选择

问题描述:Paimon 支持 changelog-producer 的选项,在 CDC 场景一般推荐使用哪个选项。
解决方案:Paimon 的 changelog-producer 功能支持下游 Flink 任务读取完整的变更日志,可以通过如下参数设置:

table.properties.changelog-producer: input

这个选项可以参考 Changelog 产出机制进行设置,在 CDC 场景下常见的设置有两种:

  1. none:如果下游不需要,则设置成 none,或者不设置即可。
  2. input:如果下游需要 changelog,因为 CDC 上游就是变更日志,所以直接选择 input 即可。