You need to enable JavaScript to run this app.
导航
MongoDB CDC
最近更新时间:2024.10.23 19:54:09首次发布时间:2023.09.12 16:22:50

MongoDB CDC 连接器提供了从 MongoDB 数据库读取全量和增量数据的能力,仅用于做数据源表。

使用限制

  • MongoDB CDC 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • MongoDB CDC 仅支持作为数据源表,MongoDB CDC 支持 3.6、4.X、5.0、6.0 版本。

DDL 定义

CREATE TABLE products (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar,
    PRIMARY KEY (_id) NOT ENFORCED  --必须定义主键。
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'products'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 mongodb-cdc 连接器。

scheme

mongodb

String

指定连接 MongoDB 的协议,示例值mongodbmongodb+srv

hosts

(none)

String

MongoDB 服务器地址,格式为ip:port
如果有多个地址,需要用英文逗号(,)分隔。

username

(none)

String

MongoDB 数据库服务的用户名。
仅当 MongoDB 设置为需要身份验证时需要设置用户名。

password

(none)

String

MongoDB 数据库服务的用户密码。
仅当 MongoDB 设置为需要身份验证时需要设置用户密码。

database

(none)

String

MongoDB 数据库名称。
支持使用正则表达式匹配数据库;如果不设置,则表示全部数据库。

collection

(none)

String

MongoDB Collection 名称。
支持使用正则表达式匹配 Collection;如果不设置,则表示全部 Collection。

connection.options

(none)

String

MongoDB 的连接选项,有多个配置项时,需要使用 & 进行连接。示例值replicaSet=test&connectTimeoutMS=300000
如需了解连接选项的更多信息,请参见Connection String Options

scan.startup.mode

initial

String

MongoDB CDC Consumer 的可选启动模式,支持initiallatest-offsettimestamp
如需了解启动模式,请参见Startup Reading Position

scan.startup.timestamp-millis

(none)

Long

设置为timestamp启动模式时的起点时间戳。

copy.existing.queue.size

10240

Integer

复制数据时要使用的队列的最大大小。

batch.size

1024

Integer

游标批量大小(cursor batch size),指在执行查询时,MongoDB 每次返回的文档数量。

poll.max.batch.size

1024

Integer

每次拉取数据的最大数量。
默认值 1024,表示在拉取间隔(默认 1000 ms)下最多能拉取 1024 条数据。

poll.await.time.ms

1000

Integer

拉取数据的时间间隔。

heartbeat.interval.ms

0

Integer

发送心跳消息的间隔时长。
默认为 0ms,表示禁用心跳信息,这意味着不会发送心跳信息来维持连接的活动状态,可能会出现一段时间内无法检测出连接中断或故障。
心跳信息时间不宜过短,以免频繁发送心跳信息导致网络开销;不宜过长,以保证能及时检测连接中断或故障。

scan.incremental.snapshot.enabled

false

Boolean

是否使用增量快照,默认不使用。
仅在 MongoDB 4.0 版本以后支持增量快照。

scan.incremental.snapshot.chunk.size.mb

64

Integer

增量快照的 chunk 大小,单位 MB。

scan.incremental.close-idle-reader.enabled

false

Boolean

是否在快照结束后关闭空闲的读取器(reader)。

类型映射

MongoDB CDC 和 Flink 字段类型对应关系如下:

MongoDB 字段类型

Flink SQL 字段类型

-

TINYINT

-

SMALLINT

Int

INT

Long

BIGINT

-

FLOAT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

DateTimestamp

DATE

DateTimestamp

TIME

Date

TIMESTAMP(3)
TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)
TIMESTAMP_LTZ(0)

String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex

STRING

BinData

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point : ROW<type STRING, coordinates ARRAY>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

示例代码

CREATE TABLE mongo_cdc_source (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar,
    PRIMARY KEY (_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost1:3717,localhost2:3717',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'doc-db',
  'collection' = 'products'
);
CREATE TABLE print_table (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar
) WITH (
 'connector' = 'print'
);
insert into print_table 
select * from mongo_cdc_source;

FAQ

如果修改了作业的拓扑结构,比如增减了 Source 的字段,则不能从历史状态恢复,此时可以丢状态,并从 timestamp 模式全新启动:
参数如下:

'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = 'xxxx' -- 毫秒时间戳,可以从 https://tool.chinaz.com/tools/unixtime.aspx 获取