You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Connector 参考
MongoDB CDC
复制全文
下载 pdf
MongoDB CDC

MongoDB CDC 连接器提供了从 MongoDB 数据库读取全量和增量数据的能力,仅用于做数据源表。

使用限制

  • MongoDB CDC 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • MongoDB CDC 仅支持作为数据源表,MongoDB CDC 支持 3.6、4.X、5.0、6.0 版本。

DDL 定义

CREATE TABLE products (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar,
    PRIMARY KEY (_id) NOT ENFORCED  --必须定义主键。
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'products'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 mongodb-cdc 连接器。

scheme

mongodb

String

指定连接 MongoDB 的协议,示例值mongodbmongodb+srv

hosts

(none)

String

MongoDB 服务器地址,格式为ip:port
如果有多个地址,需要用英文逗号(,)分隔。

username

(none)

String

MongoDB 数据库服务的用户名。
仅当 MongoDB 设置为需要身份验证时需要设置用户名。

password

(none)

String

MongoDB 数据库服务的用户密码。
仅当 MongoDB 设置为需要身份验证时需要设置用户密码。

database

(none)

String

MongoDB 数据库名称。
支持使用正则表达式匹配数据库;如果不设置,则表示全部数据库。

collection

(none)

String

MongoDB Collection 名称。
支持使用正则表达式匹配 Collection;如果不设置,则表示全部 Collection。

connection.options

(none)

String

MongoDB 的连接选项,有多个配置项时,需要使用 & 进行连接。示例值replicaSet=test&connectTimeoutMS=300000
如需了解连接选项的更多信息,请参见Connection String Options

scan.startup.mode

initial

String

MongoDB CDC Consumer 的可选启动模式,支持initiallatest-offsettimestamp

scan.startup.timestamp-millis

(none)

Long

设置为timestamp启动模式时的起点时间戳。

copy.existing.queue.size

10240

Integer

复制数据时要使用的队列的最大大小。

batch.size

1024

Integer

游标批量大小(cursor batch size),指在执行查询时,MongoDB 每次返回的文档数量。

poll.max.batch.size

1024

Integer

每次拉取数据的最大数量。
默认值 1024,表示在拉取间隔(默认 1000 ms)下最多能拉取 1024 条数据。

poll.await.time.ms

1000

Integer

拉取数据的时间间隔。

heartbeat.interval.ms

0

Integer

发送心跳消息的间隔时长。
默认为 0ms,表示禁用心跳信息,这意味着不会发送心跳信息来维持连接的活动状态,可能会出现一段时间内无法检测出连接中断或故障。
心跳信息时间不宜过短,以免频繁发送心跳信息导致网络开销;不宜过长,以保证能及时检测连接中断或故障。

scan.incremental.snapshot.enabled

false

Boolean

是否使用增量快照,默认不使用。
仅在 MongoDB 4.0 版本以后支持增量快照。

scan.incremental.snapshot.chunk.size.mb

64

Integer

增量快照的 chunk 大小,单位 MB。

scan.incremental.close-idle-reader.enabled

false

Boolean

是否在快照结束后关闭空闲的读取器(reader)。

类型映射

MongoDB CDC 和 Flink 字段类型对应关系如下:

MongoDB 字段类型

Flink SQL 字段类型

-

TINYINT

-

SMALLINT

Int

INT

Long

BIGINT

-

FLOAT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

DateTimestamp

DATE

DateTimestamp

TIME

Date

TIMESTAMP(3)
TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)
TIMESTAMP_LTZ(0)

String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex

STRING

BinData

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point : ROW<type STRING, coordinates ARRAY>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

示例代码

CREATE TABLE mongo_cdc_source (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar,
    PRIMARY KEY (_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost1:3717,localhost2:3717',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'doc-db',
  'collection' = 'products'
);
CREATE TABLE print_table (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar
) WITH (
 'connector' = 'print'
);
insert into print_table 
select * from mongo_cdc_source;

FAQ

如果修改了作业的拓扑结构,比如增减了 Source 的字段,则不能从历史状态恢复,此时可以丢状态,并从 timestamp 模式全新启动:
参数如下:

'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = 'xxxx' -- 毫秒时间戳,可以从 https://tool.chinaz.com/tools/unixtime.aspx 获取

最近更新时间:2025.07.15 16:53:16
这个页面对您有帮助吗?
有用
有用
无用
无用