You need to enable JavaScript to run this app.
导航
MySQL CDC
最近更新时间:2024.10.23 19:54:09首次发布时间:2023.09.12 16:22:50

MySQL CDC 连接器提供了从 MySQL 数据库读取全量和增量数据的能力,仅用于做数据源表。

使用限制

  • MySQL CDC 连接器暂时仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。
  • 支持 MySQL 版本为 5.6, 5.7, 8.x。
  • 如果您需要使用 MySQL CDC 连接器连接云数据库 veDB MySQL 版,您的连接终端请按照以下要求配置,否则可能会因为自定义连接终端的限制而出现任务故障。
    如需详细了解各参数含义,请参见编辑连接终端
    • 读写模式:配置为读写
    • 一致性级别:配置为最终一致性
    • 主节点接受读:关闭该选项。
    • 事务拆分:打开该选项。
  • 基于合规要求,如果需要使用 MySQL CDC 相关功能,需要用户自行上传 MySQL Driver 。请前往 MySQL 官网下载 8.0.27 版本,并且上传到作业开发依赖文件中。具体操作可以参考 使用 JDBC 或者 MySQL-CDC 数据源

DDL 定义

CREATE TABLE orders (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time timestamp,
    PRIMARY KEY (order_id) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义主键。
     ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = 'localhost',
     'port' = '3306',
     'username' = 'flinkuser',
     'password' = 'flinkpw',
     'database-name' = 'mydb',
     'table-name' = 'orders'
     );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 mysql-cdc 连接器。

hostname

(none)

String

MySQL 数据库服务器的 IP 地址或主机名。
推荐使用主库地址

port

3306

Integer

MySQL 数据库服务器的端口号。

username

(none)

String

MySQL 数据库服务器的用户名称。

password

(none)

String

MySQL 数据库服务器的用户密码。

database-name

(none)

String

数据库名称。
数据库名称支持正则表达式,以匹配多个库。

table-name

(none)

String

Table 名称。
Table 名称支持正则表达式,以匹配多个表。

server-id

(none)

Integer

读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 5400 或 5400~5408。
默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是建议用户明确指定 Server id。

说明

如果scan.incremental.snapshot.enabled参数设置为 true 时,建议 server id 配置成整数范围。

scan.incremental.snapshot.enabled

true

Boolean

增量快照读取机制。

说明

如果需要保证 Source 的并发运行,那么需要保证拥有唯一的 server id,因此建议 server id 配置成整数范围。

scan.incremental.snapshot.chunk.size

8096

Integer

读取表的快照时,捕获的表被拆为多少个块。

scan.snapshot.fetch.size

1024

Integer

读取表快照时,每次读取数据的最大条数。

scan.startup.mode

initial

String

MySQL CDC 消费者可选的启动模式。

  • initial:首次启动时对数据库表执行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的第一 binlog 位点开始读取。
  • latest-offset:首次启动时,不对数据库表执行快照,连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可以通过 binlog 文件名和位置指定,或者通过 GTID 集合指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

scan.startup.timestamp-millis

(none)

Long

设置为timestamp启动模式时的起点时间戳。
注意:不建议使用,当前版本会从最旧的 binlog 文件消费,再根据时间戳过滤需要的数据,由于读取大量 binlog,性能会比较差。

scan.startup.specific-offset.file

(none)

String

specific-offset启动模式下,启动位点的 binlog 文件名。

scan.startup.specific-offset.pos

(none)

Long

specific-offset启动模式下,启动位点的 binlog 文件位置。
注意:如果要从binlog 文件的开头开始读取,则设置 pos 为 4,因为前四个字节是 binlog 文件的 header。

scan.startup.specific-offset.gtid-set

(none)

String

specific-offset启动模式下,启动位点的 GTID 集合。

scan.startup.specific-offset.skip-events

(none)

Long

specific-offset启动模式下,在指定的启动位点后需要跳过的事件数量。

scan.startup.specific-offset.skip-rows

(none)

Long

specific-offset启动模式下,在指定的启动位点后需要跳过的数据行数量。

server-time-zone

(none)

String

数据库使用的会话时区,例如 Asia/Shanghai。该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型。

debezium.min.row. count.to.stream.result

1000

Integer

在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。该参数用于指定将事件传递到下游时,表必须包含的最小行数,默认值为 1000。
如果将此参数设置为 0,表示无需等待收集一定数量的数据,只要有一行就会立即传输。

connect.timeout

30s

Duration

连接 MySQL 数据库服务器的最长等待时间。

connect.max-retries

3

Integer

与 MySQL 数据库服务器重连的最大次数。

connection.pool.size

20

Integer

连接池大小。

jdbc.properties.*

20

String

自定义 JDBC URL 参数,例如:'jdbc.properties.useSSL' = 'false'

heartbeat.interval

30s

Duration

发送心跳事件的时间间隔,用于跟踪最新可用的 binlog 偏移量,一般用于解决慢表的问题(更新缓慢的数据表)。

debezium.*

(none)

String

Debezium 属性参数,从更细粒度控制 Debezium 客户端的行为。例如'debezium.snapshot.mode' = 'never'
如需了解更多 Debezium 属性,请参见 Debezium 属性

debezium.skipped.operations

(none)

String

选择要过滤的操作类型,其中 c 表示插入,u 表示更新,d 表示删除。
举例:如果配置为 'd',则表示过滤 delete 消息,仅同步 insert、update 消息

scan.incremental.close-idle-reader.enabled

false

Boolean

是否在快照结束后关闭空闲的读取器(reader)。

scan.read-changelog-as-append-only.enabled

false

Boolean

是否将changelog数据流转换为append-only数据流。
参数取值如下:

  • true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成INSERT类型的消息。仅在需要保存上游表删除消息等特殊场景下开启使用。
  • false(默认):所有类型的消息都保持原样下发。

支持的元数据

下表中的元数据可以在 DDL 中作为只读(虚拟)meta 列声明。

Key

DataType

Description

table_name

STRING NOT NULL

当前记录所属的表名称。

database_name

STRING NOT NULL

当前记录所属的库名称。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

当前记录表在数据库中更新的时间。
如果从表的快照而不是 binlog 读取记录,该值将始终为0。

下述创建表示例展示元数据列的用法:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

最佳实践

分库分表同步

在多库分表场景下,通过正则表达式匹配多张库表,来实现合并多张分表到一张表的功能,示例如下:

CREATE TABLE products (
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  table_name STRING METADATA  FROM 'table_name' VIRTUAL,
  operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN,
  PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 正则表达式匹配多个库。
  'table-name' = '(t[5-8]|tt)' -- 正则表达式匹配多张表。
);

匹配示例

表达式

描述

前缀匹配

^(test).*

匹配前缀为test的数据库名或表名,例如test1、test2等。

后缀匹配

.*[p$]

匹配后缀为p的数据库名或表名,例如cdcp、edcp等。

特定匹配

txc

匹配具体的数据库名或表名。

进行库表匹配时,会使用正则表达式 database-name\\.table-name 来与MySQL表的全限定名做匹配,所以该例子使用 (^(test).*|^(tpc).*|txc|.*[p$]|t{2})\\.(t[5-8]|tt),可以匹配到表 txc.tt、test2.test5。

全量拉取分片键指定

在全量读取阶段,对于数据库表比较大的情况下,可能会对数据库造成压力,MySQL CDC 在 2.0 版本后在全量同步阶段基于论文 DBLog Paper,MySql CDC Connector 实现了一种新的全量数据拉取方式,这种方式可以并行地拉取全量数据,并且不需要全局的读锁(FLUSH TABLES WITH READ LOCK)。这种算法会把 MySQL 的全量数据划分成不同的小的 chunk,并且可以并发的读取(详细原理可以参考文档)。用户可以配置用来划分 chunk 的 key 和 chunk size 来控制 chunk 的划分和每个 chunk 的大小。其中 chunk key 满足以下条件最优:

  • 数字类型:方便根据每个 chunk 的大小,快速的计算每个 chunk 的 start 和 end 值;
  • 分布均匀:保证落在每个 chunk 里的数据分布是均匀的,否则会造成大部分数据分布在少量的 chunk 内,在数据拉取阶段产生数据倾斜

FAQ

过滤 delete 消息

在业务使用过程中,经常有上游 MySQL Source 为了节省存储空间,定期把历史数据删除,只保留七天内的数据。但是业务需要 Sink 的 OLAP 系统保留历史全量的数据,因此需要把 Source 定期发送的 delete 消息都忽略掉。
解决方法:添加参数 'debezium.skipped.operations' = 'd',则表示过滤 delete 消息。

丢状态重启

如果修改了作业的拓扑结构,比如增减了 Source 的字段,则不能从历史状态恢复,此时可以丢状态,并从 specific-offset 模式全新启动。步骤如下:

  1. 获取当前位点信息,具体是打开 TM 日志,每次 checkpoint,日志里都会打印每个表的位点信息,包括 binlog file 和 binlog position。
2024-09-27 18:16:10,488 INFO  com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - Binlog offset for tables [test_db.test] on checkpoint 4: {transaction_id=null, ts_sec=0, file=mysql-binlog.000666, pos=147223270, kind=SPECIFIC, gtids=e6e6e8e3-6351-11ef-8340-52540bd1ce99:81433397-81559559, row=0, event=20, server_id=1}
  1. 修改 SQL 里 MySQL CDC Source 的启动模式,从上述位点恢复。
create table
  mysql_cdc_source (
    `id` INT NOT NULL,
    `a` DECIMAL(10, 2) NOT NULL,
    `b` DOUBLE,
    `c` DATE,
    `d` VARCHAR,
    PRIMARY KEY (`id`) NOT ENFORCED
  )
WITH
  (
    'connector' = 'mysql-cdc',
    'hostname' = 'vedbm-xxx.pri.mysql.vedb.ivolces.com',
    'port' = '3306',
    'username' = 'xroot',
    'password' = 'xxxx',
    'database-name' = 'test_db',
    'table-name' = 'test',
    'scan.startup.mode' = 'specific-offset', -- 从指定的 binlog 位点开始读取。位点可以通过 binlog 文件名和位置指定
    'scan.startup.specific-offset.file' = 'binlog.000666',
    'scan.startup.specific-offset.pos' = '147223270'
  );
  1. 重启作业,从全新状态启动。