DataSail 数据集成中的 MongoDB 数据源为您提供读取和写入 MongoDB 的双向通道数据集成能力,实现不同数据源与 MongoDB 之间进行数据传输。
下文为您介绍 MongoDB 数据同步的能力支持情况。
MongoDB 使用的驱动版本是 mongo-java-driver 3.11.0,该驱动支持的内核版本为 3.4+ 版本。
驱动能力详情请参见Mongo官方文档。
MongoDB 读写支持的字段类型:
类型 | 离线写入 | 离线读取 |
---|---|---|
OBJECTID | 支持 | 支持 |
LONG | 支持 | 支持 |
STRING | 支持 | 支持 |
INT | 支持 | 支持 |
DECIMAL | 支持 | 支持 |
NULL | 支持 | 支持 |
DOUBLE | 支持 | 支持 |
DATE | 支持 | 支持 |
TIMESTAMP | 支持 | 支持 |
BINDATA | 支持 | 支持 |
BOOL | 支持 | 支持 |
REGEX | 支持 | 支持 |
JAVASCRIPT | 支持 | 支持 |
UNDEFINED | 支持 | 支持 |
JAVASCRIPTWITHSCOPE | 支持 | 支持 |
ARRAY | 支持 | 支持 |
新建数据源操作详见配置数据源,下面为您介绍用连接串方式配置 MongoDB 数据源信息.
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
基本配置 | |
*数据源类型 | MongoDB |
*接入方式 | 连接串 |
*数据源名称 | 数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。 |
参数配置 | |
*主机名或 IP 地址 | MongoDB 接入地址,格式为主机名: 端口,单击新增按钮,支持配置多个 Hosts。 |
鉴权数据库 | 身份认证所用库。 注意 Mongo 数据源用于实时整库解决方案时,建议配置 admin 鉴权数据库,对应用户通过 CDC 消费 Binlog,权限校验较高,需保证有 ListDatabases 和 ListConnections 权限。 |
*数据库名 | 创建 MongoDB 数据库时,数据库的名称。 |
*用户名 | 数据库登录账号名称。 |
*密码 | 数据库账号密码。 |
MongoDB 数据源测试连通性成功后,进入到数据开发界面,开始新建 MongoDB 相关通道任务。
新建任务方式详见离线数据同步。
任务创建成功后,您可根据实际场景,配置 MongoDB 离线读或 MongoDB 离线写等通道任务。
数据来源选择 MongoDB,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*数据源类型 | 下拉选择 MongoDB 数据源类型。 |
*数据源名称 | 已在数据源管理中注册成功的 MongoDB 数据源,下拉可选。 |
*集合名 | 选择需要采集的数据集合名称,下拉可选,支持同时选择多个 Schema 相同的集合。 |
分库分表 | MongoDB 支持分库分表形式读取,单击添加分库分表按钮,进行分库分表添加,在下拉框中选择分库数据源与具体分表名称信息,支持添加多个分库分表。 注意 配置分库分表,需要所有集合的 Schema 信息必须保持一致,否则任务会执行异常。 |
数据过滤 | 支持您将需要同步的数据进行筛选条件设置,只同步符合过滤条件的数据,例如:{createTime:{$gte:ISODate('2024-02-02T00:00:00.000+0800')}}、{create_time:{$gte:ISODate('${DATE}T00:00:00.000+0800'),$lt:ISODate('${DATE+1}T00:00:00.000+0800')}}、{stringField:{$eq:"hello"}} 等 说明 该过滤语句通常用作增量同步,来限制返回 MongoDB 的数据范围。 |
*主键名 | MongoDB 中主键字段的名称信息。 |
数据目标端选择 MongoDB,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*目标类型 | 数据去向目标类型选择 MongoDB。 |
*数据源名称 | 已在数据源管理界面注册的 MongoDB 数据源,下拉可选。 |
*集合名 | 对应数据源下,选择需要写入数据的集合名称,下拉可选。 |
*数据写入方式 | 写入模式指定了传输数据时是否覆盖的信息:
说明 当下方高级参数中配置了 write_mode 参数时,则以高级参数中配置为准,此处选择不生效。如此处选择”直接写入“方式,并且高级参数中配置 |
*业务主键 | 当写入方式为覆盖写入时,需填写 MongoDB 集合的业务主键字段名称,支持配置多个业务主键,多个主键分隔符为逗号,配置形式为:a.b.c,a.d,k。 |
写入前准备语句 | 表示数据写入 MongoDB 前,对 MongoDB 中已有数据的前置操作,例如清理历史数据等,需确保输入语句符合 JSON 语法要求。 |
数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。
字段映射支持选择基础模式和转换模式配置映射:
注意
基础模式和转换模式不支持互相切换,模式切换后,将清空现有字段映射中所有配置信息,一旦切换无法撤销,需谨慎操作。
转换模式:
字段映射支持数据转换,您可根据实际业务需求进行配置,将源端采集的数据,事先通过数据转换后,以指定格式输入到目标端数据库中。
转换模式详细操作说明详见4.1 转换模式
在转换模式中,你可依次配置:来源节点、数据转换、目标节点信息:
配置节点 | 说明 |
---|---|
来源节点 | 配置数据来源 Source 节点信息:
配置完成后,单击确认按钮,完成来源节点配置。 |
数据转换 | 单击数据转换右侧添加按钮,选择 SQL 转换方式,配置转换信息和规则:
配置完成后,单击确认按钮,完成数据转换节点配置。SQL 脚本示例详见4.1.2 添加转换节点。 |
目标节点 | 配置目标节点 Sink 信息:
配置完成后,单击确认按钮,完成目标节点配置。 |
基础模式:
您可通过以下三种方式操作字段映射关系:
Mongo 数据源支持使用脚本模式(DSL)的方式进行配置。
在某些复杂场景下,如:数据源和独享集成资源组网络存在跨 VPC,用云企业网 CEN 打通网络,但无法配置 Mongo 数据源时、或当数据源类型暂不支持可视化配置时,您可通过任务脚本的方式,按照统一的 Json 格式,编写 Mongo Reader 参数脚本代码,来运行数据集成任务。
进入 DSL 模式操作流程,可详见6 脚本模式配置。
进入 DSL 模式编辑界面后,您可根据实际情况替换相应参数,Mongo 批式读脚本示例如下:
{ // [required] dsl version, suggest to use latest version "version": "0.2", // [required] execution mode, supoort streaming / batch now "type": "batch", // reader config "reader": { // [required] datasource type "type": "mongo", // [optional] datasource id, set it if you have registered datasource "datasource_id": null, // [required] user parameter "parameter": { // ********** please write here ********** // "key" : value // 集合名称 "collection_name":"test_xxx", //过滤条件,注意要使用 mongo 语法 "filter":"{stringField:{$eq:\"hello\"}}", //主键名 "split_pk":"_id", "self_built":true, "columns":[ { "upperCaseName":"_ID", "name":"_id", "type":"objectid" }, { "upperCaseName":"STRING_FIELD", "name":"string_field", "type":"string" }, { "upperCaseName":"INT_FIELD", "name":"int_field", "type":"int" } ], //用户名 "user_name":"****", //密码 "password":"*****", // 主机名或IP地址: "hosts_str":"xxx.mongodb.volces.com:3717,xxx.mongodb.volces.com:3717", //鉴权数据库 "auth_db_name":"db_xxx", // 数据库名 "db_name":"db_1_xxx", "port":3717, "class":"com.bytedance.dts.batch.mongodb.MongoDBInputFormat" } }, // writer config "writer": { // [required] datasource type "type": "bytehouse_cdw", // 此处有数据源ID,下面连接信息不用填 "datasource_id": xxxx, // [required] user parameter "parameter": { // ********** please write here ********** // "key" : value "ch_db":"db_xxx", "ch_table":"tab_xxx", "ch_partition_type":"String", "partition":"id=20240417", "shard_num":1, "shard_column":"_id", "codec":"none", "columns":[ { "upperCaseName":"_ID", "name":"_id", "type":"String" }, { "upperCaseName":"ADDRESS", "name":"address", "type":"String" }, { "upperCaseName":"DISCOUNT", "name":"discount", "type":"UInt8" } ], "class":"com.bytedance.dts.batch.bytehouse.BytehouseCdwOutputFormat" } }, // common config "common": { // [required] user parameter "parameter": { // ********** please write here ********** // "key" : value } } }
批式写参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:
参数名 | 参数说明 |
---|---|
*type | 数据源类型,对于 Mongo 类型,填写 mongo |
*datasource_id | 注册的 Mongo 数据源 ID,可以在项目控制台 > 数据源管理界面中查找,如果因网络问题没有配置数据源,您可填写为 null。 |
*collection_name | 填写需要读取数据的 Mongo 集合名称。 |
filter | 支持您将需要同步的数据进行筛选条件设置,只同步符合过滤条件的数据,需要注意使用 Mongo 过滤语法。例如:{createTime:{$gte:ISODate('2024-02-02T00:00:00.000+0800')}}、{stringField:{$eq:"hello"}} |
*split_pk | Mongo 中主键字段的名称信息。 |
*self_built | 判断读取的是否为 Mongo 自建数据库中的数据。true 为非火山引擎 Mongo 数据库,false 为火山引擎 Mongo 数据库。 |
*columns | 需要同步的 Mongo 文档列名集合,使用 JSON 的数组描述字段信息。 |
*user_name | 输入有权限访问 Mongo 的用户名信息。 |
*password | 输入对应用户的密码信息。 |
*hosts_str | 填写访问 Mongo 数据库的主机名或 IP 地址信息,可填写多个主机名信息,用英文逗号分隔。 |
*port | 填写 Mongo 主机的端口号。 |
auth_db_name | 填写 Mongo 的鉴权数据库信息,用于身份权限验证。 |
*db_name | 填写 Mongo 集合所在的数据库名称信息。 |
*class | Mongo Reader connector type, 默认固定值:com.bytedance.dts.batch.mongodb.MongoDBInputFormat |
对于可视化通道任务,读参数需要加上 job.reader.
前缀,写参数需要加上 job.writer.
前缀,如下图所示:
离线读支持高级参数示例如下,您可根据实际情况进行配置,更多读取的高级参数可详见 4.5.1 MySQL 批式读。
参数名 | 描述 | 默认值 |
---|---|---|
reader_fetch_size | 单批次读取文档 doc 的数量。 | 100000 |
filter | 指定读取过滤条件,满足 MongoDB 语法,如读取 id = 1000 的数据,填写示例如下: | 无 |
split_mode | 分片模式支持两种:
说明 若需关闭分片,可设置并发度为 1。 | parallelism |
离线写支持以下高级参数,您可根据实际情况进行配置:
参数名 | 描述 | 默认值 |
---|---|---|
max_connection_per_host | 连接池最大连接数。 | 100 |
connect_timeout_ms | 连接超时时间。 | 10000 |
batch_size | 单批次写入 MongoDB 的数据量。 | 100 |
write_mode | 高级参数设置写入方式:
| 无 |