SQLServer CDC 连接器用于从 SQLServer 数据库读取全量数据和增量数据,仅支持做数据源表。
CREATE TABLE sqlserver_source ( order_id bigint, order_customer_id bigint, order_product_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'sqlserver-cdc', 'hostname' = 'mssql****85.rds-mssql.ivolces.com', 'port' = '1433', 'username' = 'doc_user', 'password' = 'Pwd***5!', 'database-name' = 'doc_autotest', 'table-name' = 'dbo.orders' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 sqlserver-cdc 连接器。 |
hostname | 是 | (none) | String | SQLServer 数据库的 IP 地址或主机名。 |
username | 是 | (none) | String | SQLServer SQLServer 数据库的用户名称。 |
password | 是 | (none) | String | SQLServer 数据库的用户密码。 |
database-name | 是 | (none) | String | SQLServer 数据库名称。 |
table-name | 是 | (none) | String | SQLServer Table 名称。表名支持正则表达式以读取多个表的数据。 |
port | 否 | 1433 | Integer | SQLServer 数据库服务的端口号,默认值为 1433。 |
server-time-zone | 否 | UTC | String | SQLServer 数据库会话时区设置,例如 'Asia/Shanghai'。 |
scan.incremental.snapshot.enabled | 否 | true | Boolean | 是否启用增量快照,默认启用。 |
chunk-meta.group.size | 否 | 1000 | Integer | 快照 chunk 元数据的 group 大小,是指如果元数据的大小超过了组大小,那么元数据将会被分成多个组。 |
chunk-key.even-distribution.factor.lower-bound | 否 | 0.05d | Double | 块键(chunk key)的均匀分布因子下限。 说明 分块键分布因子用于确定表的分布是否均匀的。当数据分布均匀时,表块将使用均匀计算优化;当数据分布不均匀时,将会发生拆分查询。 |
chunk-key.even-distribution.factor.upper-bound | 否 | 1000.0d | Double | 块键(chunk key)的均匀分布因子上限。 |
debezium.* | 否 | (none) | String | Debezium 属性参数,从更细粒度控制 Debezium 客户端的行为。例如 |
scan.incremental.close-idle-reader.enabled | 否 | false | Boolean | 是否在快照结束后关闭空闲的读取器(reader)。 |
在表定义中,以下格式的元数据可以作为只读(VIRTUAL)列暴露出来。
元数据名称 | 数据类型 | 描述 |
---|---|---|
table_name | STRING NOT NULL | 包含该 Row 的表名称。 |
schema_name | STRING NOT NULL | 包含该 Row 的 schema 名称。 |
database_name | STRING NOT NULL | 包含该 Row 的数据库名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | Row 在数据库中进行更改的时间。全量阶段数据,该字段值为 0。 |
元数据使用示例:
CREATE TABLE products ( table_name STRING METADATA FROM 'table_name' VIRTUAL, schema_name STRING METADATA FROM 'schema_name' VIRTUAL, db_name STRING METADATA FROM 'database_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3) ) WITH ( 'connector' = 'sqlserver-cdc', 'hostname' = 'localhost', 'port' = '1433', 'username' = 'sa', 'password' = 'Password!', 'database-name' = 'inventory', 'schema-name' = 'dbo', 'table-name' = 'products'
SQLServer CDC 和 Flink 字段类型对应关系如下:
SQLServer CDC 字段类型 | Flink 字段类型 |
---|---|
char(n) | CHAR(n) |
varchar(n) | VARCHAR(n) |
text | STRING |
decimal(p, s) | DECIMAL(p, s) |
numeric | NUMERIC |
float | DOUBLE |
bit | BOOLEAN |
int | INT |
tinyint | SMALLINT |
smallint | SMALLINT |
bigint | BIGINT |
date | DATE |
time(n) | TIME(n) |
datetime2 | TIMESTAMP(n) |
datetimeoffset | TIMESTAMP_LTZ(3) |
CREATE TABLE sqlserver_source ( order_id bigint, order_customer_id bigint, order_product_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'sqlserver-cdc', 'hostname' = 'mssql****85.rds-mssql.ivolces.com', 'port' = '1433', 'username' = 'doc_user', 'password' = 'Pwd***5!', 'database-name' = 'doc_autotest', 'table-name' = 'dbo.orders' ); CREATE TABLE print_sink ( order_id bigint, order_customer_id bigint, order_product_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM sqlserver_source;