DataX 是开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。DataX 作为数据同步框架,它将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,使用 DataX 框架可以支持多种数据源类型的数据互通同步工作。
详见:https://github.com/alibaba/DataX
本文将为您介绍在火山引擎大数据研发治理套件 DataLeap 上,通过 Shell 任务调用 DataX 的方式,将火山引擎云数据库 MySQL 与 文档数据库 MongDB 进行数据互通。
注意
若仅开通 Dataleap 大数据集成服务,不支持创建 Shell 任务。
--创建表 create table mysql_mongodb ( id int unsigned auto_increment, name varchar(400) not null, address varchar(400) not null, create_time bigint not null, event_time bigint not null, price double not null, date_info date not null PRIMARY KEY ( `Id` ) ); --插入以下测试数据 125037,张三,天津市丽县,1669862281,1668155516,-36010.5893188364,2022-12-11 125042,李四,河北省斌县,1701360001,1668155793,-35754.8812765556,2022-12-11 125044,王五,宁夏回族自治区慧县,1666972800,1668155829,-98763.5966260757,2022-12-11 125046,赵六,河南省淑兰县,1669824001,1668155873,-42967.791140771,2022-12-11 125047,小兰,重庆市秀英县,1668156174,1668155874,-12472.4318984986,2022-12-11 125073,小梅,西藏自治区玉华县,1669820401,1668157424,79521.9035130631,2022-12-11 153995,小琴,江苏省马鞍山县,1669804707,1670157326,-77601.3264138525,2022-12-11 153996,雅芝,辽宁省佛山市,1669727579,1670157380,-69563.8610820313,2022-12-11 154008,小莉,台湾省哈尔滨市,1675849149,1675848849,-4627.3688887951,2022-12-11 154011,小军,福建省淑兰县,1675854578,1675854278,12256.9309321271,2023-02-08 154012,小刚,宁夏回族自治区乌鲁木齐县,1675854784,1675854484,-60220.5831664783,2023-02-08
在 MongoDB 目标数据库中,执行以下命令,创建集合名称:
db.createCollection("mysql2mongo")
任务完成新建后,进入 Shell 任务编辑界面,进行以下脚本编辑:
Shell 脚本编辑
echo '{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "username", //数据库登录账号信息 "password": "password", //数据库登录密码信息 "column": [ // 表中需要同步的字段名称集合,使用 JSON 的数组描述字段信息。您可使用*代表默认使用所有列配置,例如['*'] "id", "name", "address", "create_time", "event_time", "price", "date_info" ], "splitPk": "id", //切分建。根据split_pk指定的字段进行数据分片,同步时启动并发任务进行数据同步。推荐使用表主键切分。 "connection": [ { "jdbcUrl": [ "jdbc:mysql://ip:port/databases" // 数据库的JDBC私网地址连接信息,可用数据库IP或实例名称方式填写连接信息。 ], "table": ["mysql_mongodb"] //需同步的数据表名称 } ] } }, "writer": { "name": "mongodbwriter", //Writer 端类型 "parameter": { "address": [ "ip:port" //MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出 ], "userName": "userName", // MongoDB的用户名 "userPassword": "userPassword", //MongoDB对应用户的密码 "dbName": "dbName", //MonogoDB的库名信息 "collectionName": "mysql2mongo", //MonogoDB的集合名 "column": [ //MongoDB的文档列名,以名称和对应的数据类型方式填写。 { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "address", "type": "string" }, { "name": "create_time", "type": "Long" }, { "name": "event_time", "type": "Long" }, { "name": "price", "type": "double" }, { "name": "date_info", "type": "date" } ], "writeMode": { "isReplace": "true", //当设置为true时,表示针对相同的replaceKey做更新操作 "replaceKey": "id" //replaceKey指定了每行记录的业务主键。用来做更新时使用 } } } } ], "setting": { "speed": { "channel": "1" //同步任务并发数设置 } } } }' > mysql2mongodb.json python3 ./datax/bin/datax.py mysql2mongodb.json //启动 DataX
说明
脚本配置时,需将以上示例脚本中的注释说明删除后再实际执行。
更多 Reader 和 Writer 插件,详见开源 DataX。
脚本配置完成后,您可进行以下操作,完成任务执行资源配置:
在选取独享计算资源组设置后,网络配置中会默认将独享计算资源组绑定的私有网络、子网、安全组信息填入,且不可修改,您可在创建独享计算资源组时,配置好对应的私有网络信息,详见:资源组管理。
产出数据登记用于记录任务的数据血缘,不会对代码逻辑造成影响,此示例选择默认。
任务配置完成后,依次单击上方操作栏中保存和调试图标按钮,执行编辑好的 Shell 命令,执行成功后,可在界面下方查看运行日志和同步结果。
使用 DataGrip 本地数据库工具,登录 MongoDB 的数据库,执行以下查询命令:
db.getCollection("mysql2mongo")
执行结果如下:
数据验证确认无误后,您可进行后续的调度设置和将任务提交发布到运维中心离线任务运维中执行。
后续任务运维操作详见:离线任务运维。