Postgres CDC 连接器用于从 PostgreSQL 数据库读取全量快照数据和增量数据,仅支持做数据源表。
scan.incremental.snapshot.enabled=false
(默认配置)时,如果任务在全表扫描阶段触发 Checkpoint,则可能由于 Checkpoint 超时导致任务 Failover。因此,建议您在 Flink 参数中配置 Checkpoint 时间间隔,以及配置 Task 重启策略,以避免在全量同步阶段由于 Checkpoint 超时导致任务 Failover。CREATE TABLE pgsql_source ( order_id bigint, order_customer_id bigint, order_product_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (`order_id`) NOT ENFORCED -- 如果要数据库表定义了主键, 则这里也需要定义。 ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'postgre***da.rds-pg.ivolces.com', 'port' = '5432', 'username' = 'doc_user', 'password' = 'Pw**45!', 'database-name' = 'doc_autotest', 'schema-name' = 'public', 'table-name' = 'orders', 'slot.name' = 'order' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 postgres-cdc 连接器。 |
hostname | 是 | (none) | String | PostgreSQL 数据库的 IP 地址或主机名。 |
username | 是 | (none) | String | PostgreSQL 数据库的用户名称。 |
password | 是 | (none) | String | PostgreSQL 数据库的用户密码。 |
database-name | 是 | (none) | String | PostgreSQL 数据库名称。 |
schema-name | 是 | (none) | String | PostgreSQL Schema 名称。Schema 名称支持正则表达式以读取多个 Schema 的数据。 |
table-name | 是 | (none) | String | PostgreSQL Table 名称。表名支持正则表达式以读取多个表的数据。 |
port | 否 | 5432 | Integer | Postgres 数据库服务的端口号,默认值为 5432。 |
slot.name | 否 | flink | String | 逻辑解码槽的名字。 说明 建议每个表都设置 |
decoding.plugin.name | 否 | decoderbufs | String | Postgres Logical Decoding 插件名称。根据 Postgres 服务上安装的插件确定。支持的插件列表如下:
|
changelog-mode | 否 | all | String | 使用的更改日志模式(changelog mode),支持以下两种模式。
|
heartbeat.interval.ms | 否 | 30s | Duration | 发送心跳消息的间隔时长。 |
debezium.* | 否 | (none) | String | Debezium 属性参数。 |
debezium.snapshot.select.statement.overrides | 否 | (none) | String | 如果表中有大量数据,但您不需要全部历史数据时,您可以在 Debezium 的底层配置中指定要进行快照的数据范围。该参数只会影响快照,不会影响后续的数据读取消费。 说明 在 PostgreSQL 中,必须使用 Schema 和 Table 名称进行配置,例如 |
debezium.snapshot.select.statement.overrides.[schema].[table] | 否 | (none) | String | 如果您想要限制快照的数据范围,可以使用
|
scan.incremental.snapshot.enabled | 否 | false | Boolean | 是否使用增量快照,默认不使用。
|
当scan.incremental.snapshot.enabled=true
时,以下增量快照参数可用。
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 快照的 chunk 大小(行数),指在读取表快照时,将捕获的表分割成多个较小的块或部分。 |
scan.startup.mode | 否 | initial | String | Postgres CDC Consumer 的可选启动模式,支持
|
chunk-meta.group.size | 否 | 1000 | Integer | 快照 chunk 元数据的 group 大小,是指如果元数据的大小超过了组大小,那么元数据将会被分成多个组。 |
connect.timeout | 否 | 30s | Duration | 连接器连接到 PostgreSQL 服务后的最长等待时长。 |
connect.pool.size | 否 | 30 | Integer | 连接池大小。 |
connect.max-retries | 否 | 3 | Integer | 与 PostgreSQL 数据库服务器重连的最大次数。 |
scan.snapshot.fetch.size | 否 | 1024 | Integer | 读取表快照时,每次读取数据的最大条数。 |
scan.incremental.snapshot.chunk.key-column | 否 | (none) | String | 表快照的分块键(chunk key),指在读取表快照时,通过一个分块键将捕获的表分割成多个块。默认情况下,分块键是主键的第一列。该列必须是主键的一部分。 |
chunk-key.even-distribution.factor.lower-bound | 否 | 0.05d | Double | 块键(chunk key)的均匀分布因子下限。 说明 分块键分布因子用于确定表的分布是否均匀的。当数据分布均匀时,表块将使用均匀计算优化;当数据分布不均匀时,将会发生拆分查询。 |
chunk-key.even-distribution.factor.upper-bound | 否 | 1000.0d | Double | 块键(chunk key)的均匀分布因子上限。 |
在表定义中,以下格式的元数据可以作为只读(VIRTUAL)列暴露出来。
元数据名称 | 数据类型 | 描述 |
---|---|---|
table_name | STRING NOT NULL | 包含该 Row 的表名称。 |
schema_name | STRING NOT NULL | 包含该 Row 的 schema 名称。 |
database_name | STRING NOT NULL | 包含该 Row 的数据库名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | Row 在数据库中进行更改的时间。全量阶段数据,该字段值为 0。 |
元数据使用示例:
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'localhost', 'port' = '5432', 'username' = 'postgres', 'password' = 'postgres', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments' );
Postgres CDC 和 Flink 字段类型对应关系如下:
Postgres CDC 字段类型 | Flink 字段类型 |
---|---|
TINYINT | |
SMALLINT | SMALLINT |
INTEGER | INT |
BIGINT | BIGINT |
DECIMAL(20, 0) | |
BIGINT | BIGINT |
REAL | FLOAT |
FLOAT8 | DOUBLE |
NUMERIC(p, s) | DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
BYTEA | BYTES |
CREATE TABLE pgsql_source ( order_id bigint, order_customer_id bigint, order_product_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (`order_id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义。 ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'postgre***da.rds-pg.ivolces.com', 'port' = '5432', 'username' = 'doc_autotest', 'password' = 'Pw**45!', 'database-name' = 'doc_autotest', 'schema-name' = 'public', 'table-name' = 'orders', 'slot.name' = 'order' ); CREATE TABLE print_table ( order_id bigint, order_customer_id bigint, order_product_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'print' ); INSERT INTO print_table SELECT * FROM pgsql_source;