MySQL CDC 连接器提供了从 MySQL 数据库读取全量和增量数据的能力,仅用于做数据源表。
CREATE TABLE orders ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义主键。 ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb', 'table-name' = 'orders' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 mysql-cdc 连接器。 |
hostname | 是 | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 |
port | 否 | 3306 | Integer | MySQL 数据库服务器的端口号。 |
username | 是 | (none) | String | MySQL 数据库服务器的用户名称。 |
password | 是 | (none) | String | MySQL 数据库服务器的用户密码。 |
database-name | 是 | (none) | String | 数据库名称。 |
table-name | 是 | (none) | String | Table 名称。 |
server-id | 否 | (none) | Integer | 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 5400 或 5400~5408。 说明 如果 |
scan.incremental.snapshot.enabled | 否 | true | Boolean | 增量快照读取机制。 说明 如果需要保证 Source 的并发运行,那么需要保证拥有唯一的 server id,因此建议 server id 配置成整数范围。 |
scan.incremental.snapshot.chunk.size | 否 | 8096 | Integer | 读取表的快照时,捕获的表被拆为多少个块。 |
scan.snapshot.fetch.size | 否 | 1024 | Integer | 读取表快照时,每次读取数据的最大条数。 |
scan.startup.mode | 否 | initial | String | MySQL CDC 消费者可选的启动模式。
|
scan.startup.timestamp-millis | 否 | (none) | Long | 设置为 |
scan.startup.specific-offset.file | 否 | (none) | String | 在 |
scan.startup.specific-offset.pos | 否 | (none) | Long | 在 |
scan.startup.specific-offset.gtid-set | 否 | (none) | String | 在 |
scan.startup.specific-offset.skip-events | 否 | (none) | Long | 在 |
scan.startup.specific-offset.skip-rows | 否 | (none) | Long | 在 |
server-time-zone | 否 | (none) | String | 数据库使用的会话时区,例如 Asia/Shanghai。该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型。 |
debezium.min.row. count.to.stream.result | 否 | 1000 | Integer | 在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。该参数用于指定将事件传递到下游时,表必须包含的最小行数,默认值为 1000。 |
connect.timeout | 否 | 30s | Duration | 连接 MySQL 数据库服务器的最长等待时间。 |
connect.max-retries | 否 | 3 | Integer | 与 MySQL 数据库服务器重连的最大次数。 |
connection.pool.size | 否 | 20 | Integer | 连接池大小。 |
jdbc.properties.* | 否 | 20 | String | 自定义 JDBC URL 参数,例如: |
heartbeat.interval | 否 | 30s | Duration | 发送心跳事件的时间间隔,用于跟踪最新可用的 binlog 偏移量,一般用于解决慢表的问题(更新缓慢的数据表)。 |
debezium.* | 否 | (none) | String | Debezium 属性参数,从更细粒度控制 Debezium 客户端的行为。例如 |
debezium.skipped.operations | 否 | (none) | String | 选择要过滤的操作类型,其中 c 表示插入,u 表示更新,d 表示删除。 |
scan.incremental.close-idle-reader.enabled | 否 | false | Boolean | 是否在快照结束后关闭空闲的读取器(reader)。 |
scan.read-changelog-as-append-only.enabled | 否 | false | Boolean | 是否将changelog数据流转换为append-only数据流。
|
下表中的元数据可以在 DDL 中作为只读(虚拟)meta 列声明。
Key | DataType | Description |
---|---|---|
table_name | STRING NOT NULL | 当前记录所属的表名称。 |
database_name | STRING NOT NULL | 当前记录所属的库名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 当前记录表在数据库中更新的时间。 |
下述创建表示例展示元数据列的用法:
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'orders' );
在多库分表场景下,通过正则表达式匹配多张库表,来实现合并多张分表到一张表的功能,示例如下:
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 正则表达式匹配多个库。 'table-name' = '(t[5-8]|tt)' -- 正则表达式匹配多张表。 );
匹配示例 | 表达式 | 描述 |
---|---|---|
前缀匹配 | ^(test).* | 匹配前缀为test的数据库名或表名,例如test1、test2等。 |
后缀匹配 | .*[p$] | 匹配后缀为p的数据库名或表名,例如cdcp、edcp等。 |
特定匹配 | txc | 匹配具体的数据库名或表名。 |
进行库表匹配时,会使用正则表达式 database-name\\.table-name
来与MySQL表的全限定名做匹配,所以该例子使用 (^(test).*|^(tpc).*|txc|.*[p$]|t{2})\\.(t[5-8]|tt)
,可以匹配到表 txc.tt、test2.test5。
在全量读取阶段,对于数据库表比较大的情况下,可能会对数据库造成压力,MySQL CDC 在 2.0 版本后在全量同步阶段基于论文 DBLog Paper,MySql CDC Connector 实现了一种新的全量数据拉取方式,这种方式可以并行地拉取全量数据,并且不需要全局的读锁(FLUSH TABLES WITH READ LOCK)。这种算法会把 MySQL 的全量数据划分成不同的小的 chunk,并且可以并发的读取(详细原理可以参考文档)。用户可以配置用来划分 chunk 的 key 和 chunk size 来控制 chunk 的划分和每个 chunk 的大小。其中 chunk key 满足以下条件最优:
在业务使用过程中,经常有上游 MySQL Source 为了节省存储空间,定期把历史数据删除,只保留七天内的数据。但是业务需要 Sink 的 OLAP 系统保留历史全量的数据,因此需要把 Source 定期发送的 delete 消息都忽略掉。
解决方法:添加参数 'debezium.skipped.operations' = 'd',则表示过滤 delete 消息。
如果修改了作业的拓扑结构,比如增减了 Source 的字段,则不能从历史状态恢复,此时可以丢状态,并从 specific-offset 模式全新启动。步骤如下:
2024-09-27 18:16:10,488 INFO com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - Binlog offset for tables [test_db.test] on checkpoint 4: {transaction_id=null, ts_sec=0, file=mysql-binlog.000666, pos=147223270, kind=SPECIFIC, gtids=e6e6e8e3-6351-11ef-8340-52540bd1ce99:81433397-81559559, row=0, event=20, server_id=1}
create table mysql_cdc_source ( `id` INT NOT NULL, `a` DECIMAL(10, 2) NOT NULL, `b` DOUBLE, `c` DATE, `d` VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'vedbm-xxx.pri.mysql.vedb.ivolces.com', 'port' = '3306', 'username' = 'xroot', 'password' = 'xxxx', 'database-name' = 'test_db', 'table-name' = 'test', 'scan.startup.mode' = 'specific-offset', -- 从指定的 binlog 位点开始读取。位点可以通过 binlog 文件名和位置指定 'scan.startup.specific-offset.file' = 'binlog.000666', 'scan.startup.specific-offset.pos' = '147223270' );