You need to enable JavaScript to run this app.
导航
MySQL
最近更新时间:2024.10.23 20:28:33首次发布时间:2024.10.23 20:28:33

MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。

使用限制

  • MySQL CDC 连接器暂时仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。
  • 基于合规要求,如果需要使用 MySQL CDC 相关功能,需要用户自行上传 MySQL Driver 。请前往 MySQL 官网下载 8.0.27 版本,并且上传到作业开发依赖文件中。具体操作可以参考 使用 JDBC 或者 MySQL-CDC 数据源

如何创建 Pipeline

从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\., bdb.user_table_[0-9]+, [app|web].order_\.
   server-id: 5401-5404

sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4

连接器配置项

Option

Required

Default

Type

Description

hostname

required

(none)

String

MySQL 数据库服务器的 IP 地址或主机名。

port

optional

3306

Integer

MySQL 数据库服务器的整数端口号。

username

required

(none)

String

连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。

password

required

(none)

String

连接 MySQL 数据库服务器时使用的密码。

tables

required

(none)

String

需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
例如,db0..*, db1.user_table_[0-9]+, db[1-2].[app

tables.exclude

optional

(none)

String

需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。
用法和tables参数相同

schema-change.enabled

optional

TRUE

Boolean

是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。

server-id

optional

(none)

String

读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。

scan.incremental.snapshot.chunk.size

optional

8096

Integer

表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。

scan.snapshot.fetch.size

optional

1024

Integer

读取表快照时每次读取数据的最大条数。

scan.startup.mode

optional

initial

String

MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 ""snapshot"。

scan.startup.specific-offset.file

optional

(none)

String

在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。

scan.startup.specific-offset.pos

optional

(none)

Long

在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。

scan.startup.specific-offset.gtid-set

optional

(none)

String

在 "specific-offset" 启动模式下,启动位点的 GTID 集合。

scan.startup.timestamp-millis

optional

(none)

Long

在 "timestamp" 启动模式下,启动位点的毫秒时间戳。

scan.startup.specific-offset.skip-events

optional

(none)

Long

在指定的启动位点后需要跳过的事件数量。

scan.startup.specific-offset.skip-rows

optional

(none)

Long

在指定的启动位点后需要跳过的数据行数量。

connect.timeout

optional

30s

Duration

连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。

connect.max-retries

optional

3

Integer

连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。

connection.pool.size

optional

20

Integer

连接池大小。

jdbc.properties.*

optional

20

String

传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'.

heartbeat.interval

optional

30s

Duration

用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。

debezium.*

optional

(none)

String

将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。 例如: 'debezium.snapshot.mode' = 'never'. 查看更多关于 Debezium 的 MySQL 连接器属性

scan.incremental.close-idle-reader.enabled

optional

FALSE

Boolean

是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。

scan.newly-added-table.enabled

optional

FALSE

Boolean

是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。

启动模式

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  • snapshot: 只进行快照阶段,跳过增量阶段,快照阶段读取结束后退出。

例如,可以在 YAML 配置文件中这样指定启动模式:

source:
  type: mysql
  scan.startup.mode: earliest-offset                    # Start from earliest offset
  scan.startup.mode: latest-offset                      # Start from latest offset
  scan.startup.mode: specific-offset                    # Start from specific offset
  scan.startup.mode: timestamp                          # Start from timestamp
  scan.startup.mode: snapshot                          # Read snapshot only
  scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
  scan.startup.specific-offset.pos: 4                   # Binlog position under specific offset mode
  scan.startup.specific-offset.gtid-set: 24DA167-...    # GTID set under specific offset startup mode
  scan.startup.timestamp-millis: 1667232000000          # Timestamp under timestamp startup mode
  # ...

数据类型映射

MySQL type

CDC type

NOTE

TINYINT(n)

TINYINT

SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL

SMALLINT

INT
YEAR
MEDIUMINT
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL

INT

BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL

BIGINT

BIGINT UNSIGNED
BIGINT UNSIGNED ZEROFILL
SERIAL

DECIMAL(20, 0)

FLOAT
FLOAT UNSIGNED
FLOAT UNSIGNED ZEROFILL

FLOAT

REAL
REAL UNSIGNED
REAL UNSIGNED ZEROFILL
DOUBLE
DOUBLE UNSIGNED
DOUBLE UNSIGNED ZEROFILL
DOUBLE PRECISION
DOUBLE PRECISION UNSIGNED
DOUBLE PRECISION UNSIGNED ZEROFILL

DOUBLE

NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where p <= 38

DECIMAL(p, s)

NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where 38 < p <= 65

STRING

在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。

BOOLEAN
TINYINT(1)
BIT(1)

BOOLEAN

DATE

DATE

TIME [(p)]

TIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT

STRING

TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB

BYTES

目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。

ENUM

STRING

JSON

STRING

JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。

SET

暂不支持

GEOMETRY
POINT
LINESTRING
POLYGON
MULTIPOINT
MULTILINESTRING
MULTIPOLYGON
GEOMETRYCOLLECTION

STRING

MySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。

空间数据类型映射

MySQL中除GEOMETRYCOLLECTION之外的空间数据类型都会转换为 Json 字符串,格式固定,如:

{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}

字段srid标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。 由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid将始终为 0。
字段type标识空间数据类型,例如POINT/LINESTRING/POLYGON
字段coordinates表示空间数据的坐标
对于GEOMETRYCOLLECTION,它将转换为 Json 字符串,格式固定,如:

{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}

Geometrics字段是一个包含所有空间数据的数组。
不同空间数据类型映射的示例如下:

Spatial data in MySQL

Json String converted in Flink

POINT(1 1)

{"coordinates":[1,1],"type":"Point","srid":0}

LINESTRING(3 0, 3 3, 3 5)

{"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}

POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))

{"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0}

MULTIPOINT((1 1),(2 2))

{"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}

MultiLineString((1 1,2 2,3 3),(4 4,5 5))

{"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}

MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))

{"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}

GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))

{"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}