MongoDB CDC 连接器提供了从 MongoDB 数据库读取全量和增量数据的能力,仅用于做数据源表。
CREATE TABLE products ( _id bigint, order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time varchar, PRIMARY KEY (_id) NOT ENFORCED --必须定义主键。 ) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = 'localhost:27017,localhost:27018,localhost:27019', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database' = 'inventory', 'collection' = 'products' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 mongodb-cdc 连接器。 |
scheme | 否 | mongodb | String | 指定连接 MongoDB 的协议,示例值 |
hosts | 是 | (none) | String | MongoDB 服务器地址,格式为 |
username | 否 | (none) | String | MongoDB 数据库服务的用户名。 |
password | 否 | (none) | String | MongoDB 数据库服务的用户密码。 |
database | 否 | (none) | String | MongoDB 数据库名称。 |
collection | 否 | (none) | String | MongoDB Collection 名称。 |
connection.options | 否 | (none) | String | MongoDB 的连接选项,有多个配置项时,需要使用 & 进行连接。示例值 |
scan.startup.mode | 否 | initial | String | MongoDB CDC Consumer 的可选启动模式,支持 |
scan.startup.timestamp-millis | 否 | (none) | Long | 设置为 |
copy.existing.queue.size | 否 | 10240 | Integer | 复制数据时要使用的队列的最大大小。 |
batch.size | 否 | 1024 | Integer | 游标批量大小(cursor batch size),指在执行查询时,MongoDB 每次返回的文档数量。 |
poll.max.batch.size | 否 | 1024 | Integer | 每次拉取数据的最大数量。 |
poll.await.time.ms | 否 | 1000 | Integer | 拉取数据的时间间隔。 |
heartbeat.interval.ms | 否 | 0 | Integer | 发送心跳消息的间隔时长。 |
scan.incremental.snapshot.enabled | 否 | false | Boolean | 是否使用增量快照,默认不使用。 |
scan.incremental.snapshot.chunk.size.mb | 否 | 64 | Integer | 增量快照的 chunk 大小,单位 MB。 |
scan.incremental.close-idle-reader.enabled | 否 | false | Boolean | 是否在快照结束后关闭空闲的读取器(reader)。 |
MongoDB CDC 和 Flink 字段类型对应关系如下:
MongoDB 字段类型 | Flink SQL 字段类型 |
---|---|
- | TINYINT |
- | SMALLINT |
Int | INT |
Long | BIGINT |
- | FLOAT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
DateTimestamp | DATE |
DateTimestamp | TIME |
Date | TIMESTAMP(3) |
Timestamp | TIMESTAMP(0) |
String | STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
Point : ROW<type STRING, coordinates ARRAY |
CREATE TABLE mongo_cdc_source ( _id bigint, order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time varchar, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = 'localhost1:3717,localhost2:3717', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database' = 'doc-db', 'collection' = 'products' ); CREATE TABLE print_table ( _id bigint, order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time varchar ) WITH ( 'connector' = 'print' ); insert into print_table select * from mongo_cdc_source;
如果修改了作业的拓扑结构,比如增减了 Source 的字段,则不能从历史状态恢复,此时可以丢状态,并从 timestamp 模式全新启动:
参数如下:
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = 'xxxx' -- 毫秒时间戳,可以从 https://tool.chinaz.com/tools/unixtime.aspx 获取