实时数据采集方案支持您将源数据库的增量数据实时采集至消息队列。该方案场景适用于您将源端增量数据同步至目标端数据源库表中,且增量数据较大或者存在波峰波谷时,建议您先进行数据缓存的中间步骤。本文为您介绍如何创建采集解决方案将 RDS MySQL 实例的 Binlog 采集到集成托管消息队列(即 DataSail 数据源),并在数据同步方案中使用缓存方案,实现增量数据实时同步到下游 Doris 数据表。
注意
手动创建 Topic 时,选择采集类型为“数据库采集”。操作详见 Topic 管理。
源端 MySQL 数据库中,创建以下表结构,并插入 5 条示例数据:
创建 MySQL 表:
CREATE TABLE `demo1` ( `Id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `Name` char(10) DEFAULT NULL, `Address` varchar(400) DEFAULT NULL, `Event_Time` bigint(20) DEFAULT NULL, `Price` double DEFAULT NULL, `Num_Big` bigint(20) DEFAULT NULL, `Num_Float` float(13, 3) DEFAULT NULL, `Datetime_Info` datetime DEFAULT NULL, `Timestamp_Info` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `Tinytext_Info` tinytext, PRIMARY KEY (`Id`) ) ENGINE = InnoDB AUTO_INCREMENT = 456014264 DEFAULT CHARSET = utf8;
MySQL 中插入模拟示例数据:
insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Timestamp_Info, Tinytext_Info) values (1, 'dataleap', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', '2023-03-27 14:24:22', 'KKKKKKKKKK'); insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (2, '数据开发', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', 'KKKKKKKKKK'); insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (3, '数据集成', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', 'KKKKKKKKKK'); insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (4, '数据运维', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', 'KKKKKKKKKK'); insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (5, '数据监控', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', 'KKKKKKKKKK');
前置操作中,源端、目标端数据源配置操作准备完成后,您可以开始进行实时数据采集方案配置:
登录 DataSail 控制台。
在左侧导航栏中选择数据同步方案,进入同步方案配置界面。
单击目录树中项目选择入口,选择已创建的 DataLeap 项目。
单击右上角新建数据同步解决方案按钮,下拉选择实时数据采集按钮,进入实时数据采集方案配置界面
进入配置界面后,您可按实际场景需求,完成方案的基本配置、数据来源配置、数据目标配置、运行配置等流程配置。
基本配置参数说明如下表所示。其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。
配置项 | 说明 |
---|---|
基本信息 | |
*方案名称 | 输入实时数据采集方案名称。只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需在127个字符以内。 |
方案描述 | 输入此方案的描述信息,方便后续维护管理。 |
*保存至 | 下拉选择方案保存路径,此路径为数据开发项目中的任务路径。创建方式详见任务目录树管理。 |
*链路类型 | 下拉选择来源和目标端数据源类型。本实践来源选择 MySQL,目标选择 DataSail(内置 Topic)。 |
网络与资源配置 | |
*数据来源 | 下拉选择数据源管理中创建成功的 MySQL 源端数据源名称,支持选择多个。 |
*数据目标 | 下拉选择数据源管理中创建成功的 DataSail 目标端数据源名称。 |
*实时集成任务资源组 | 下拉选择 DataLeap 项目控制台中已绑定的独享数据集成资源组:
|
方案基本配置完成后,单击右下角下一步按钮,进行方案的数据来源配置。
在数据来源配置界面中,完成数据来源设置与库表映射规则匹配策略:
其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。
配置项 | 说明 |
---|---|
数据源设置 | |
*数据源名称 | 下拉选择数据源管理中创建成功的 MySQL 数据源。 |
*订阅格式 | 下拉选择 Debezium Json 类型的数据订阅格式。 |
*源库、源表选择限定条件 | 单击添加规则按钮,完成源库表的限定条件规则添加,支持添加多个规则,取合集,在本实践中:
|
表 | 单击获取源表按钮,平台根据源库、源表限定条件的设置,自动加载符合条件的表。 |
映射规则 | |
*库表匹配策略 | 选择自定义映射规则匹配策略,您可将目标表名,配置为已创建好的 Topic 名称;或设置为变量参数:“${table_name_src_transed}”,后续执行方案时,平台便将为您自动创建与源端 MySQL 同名的 Topic 名称。 |
数据来源配置完成后,单击右下角下一步按钮,进行方案的数据目标配置。
在数据目标配置界面,单击“刷新逻辑表和目标表映射”,生成源表和目标 Topic 的采集关系。
数据目标配置完成后,单击下一步,进行方案运行配置,资源设置您可使用默认设置,也可通过自定义方式设置资源参数,本次实践中,资源设置参数如下:
运行配置设置完成后,您可单击下方提交方案按钮,并在弹窗中,勾选“立即执行”,并单击确定按钮,等待方案执行完成。
方案执行完成后,您可切换到数据采集 > Topic 管理界面中,查看对应 Topic 状态详情:
说明
实时数据采集更多详细操作详见实时数据采集方案。
实时数据采集方案创建完成后,您可实际场景,按需创建实时整库同步或分库分表实时同步方案,将 MySQL Binlog 数据写入到 Doris 目标数据源中。本实践中,将以创建分库分表实时同步方案为例。
在左侧导航栏中选择数据同步方案,进入同步方案配置界面。
单击目录树中项目选择入口,选择已创建的 DataLeap 项目。
单击右上角新建数据同步解决方案按钮,下拉选择实时分库分表同步按钮,进入分库分表实时同步方案配置界面。
在基本配置界面,完成以下参数说明:
其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。
配置项 | 说明 |
---|---|
基本信息 | |
*方案名称 | 输入实时分库分表同步方案名称。只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需在127个字符以内。 |
方案描述 | 输入此方案的描述信息,方便后续维护管理。 |
*保存至 | 下拉选择方案保存路径,此路径为数据开发项目中的任务路径。创建方式详见任务目录树管理。 |
*链路类型 | 下拉选择来源 MySQL 和目标端 Doris 数据源类型。 |
网络与资源配置 | |
*数据来源 | 下拉选择数据源管理中创建成功的 MySQL 源端数据源名称,支持选择多个。 |
数据缓存 | 选择同步解决方案是否需要通过数据缓存的方式来采集数据,这里勾选使用缓存, 需选择已创建成功的 DataSail(内置 Topic) 数据源缓存方式。 |
*数据目标 | 下拉选择数据源管理中创建成功的 Doris 目标端数据源名称。 |
离线/实时集成任务资源组 | 下拉选择在数据源管理界面中,创建来源、中间缓存、目标端数据源时,测试连通性能够成功的独享集成资源组。选择资源组后,您可进行以下操作:
若此前还未创建独享集成资源组,您也可以单击资源组管理按钮,前往资源组管理界面进行资源组的查看或新建等操作,详见资源组管理。 |
方案基本配置完成后,单击右下角下一步按钮,进行方案的数据来源配置。
在数据来源配置界面中,完成数据来源的选择,数据源库表名称支持通过正则匹配。
其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。
配置项 | 说明 |
---|---|
数据源设置 | |
*数据源名称 | 展现基本配置中选择的数据源,您也可以在此处再新增其余需要采集的数据源。 |
*源库、源表选择限定条件 | 单击添加规则按钮,完成源库表的限定条件规则添加,支持添加多个规则,取合集,在本实践中:
|
表 | 单击获取源表/获取源集合按钮,平台根据源库、源表/集合限定条件的设置,自动加载符合条件的表/集合。 注意 选择的多个库表,需保证其 Schema 信息一致(包括字段名称、字段类型)。 |
映射规则 | |
*库表匹配策略 | 选择自定义映射规则匹配策略,设置库表名的转换规则,在目标库表名框中输入相应已创建好的 Doris 库表名称信息。 |
数据来源配置完成后,单击右下角下一步按钮,进行方案的数据缓存配置。
在数据缓存配置界面,同步方式选择“增加缓存”,且数据源来自“DataSail(内置 Topic)”类型,并完成以下映射配置:
在数据目标配置界面,单击“刷新逻辑表和目标表映射”,生成源表和目标表的映射关系,您可根据实际情况,设置目标表的分片键、分表键、是否全量同步等配置项。
数据目标配置完成后,单击右下角下一步按钮,进行方案的运行配置。
在运行配置界面,您可根据实际情况设置以下信息:
离线全量同步
其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。
配置项 | 说明 |
---|---|
*离线集成任务资源组 | 下拉选择 DataLeap 项目控制台中已绑定且处于健康状态的独享数据集成资源组。 |
*默认 Quota 数 | 设置可同时提交执行的集成任务数量,可根据独享集成资源组规格进行配置,如资源组的大小为 40CU,则 Quota 配置需必须小于 20(40/2),否则会因资源问题导致任务执行时异常。 |
*期望最大并发数 | 设置离线任务同步时,可以从源端并行读取或并行写入目标端的最大线程数。 |
脏数据设置 | 按需勾选设置任务执行过程中的脏数据处理方式。 |
集成高级参数设置 | 打开高级参数输入按钮,根据实际业务要求,以 Key/Value 形式,在编辑框中输入离线任务所需的高级参数。支持参数详见各数据源中的高级参数介绍。 |
实时增量同步
设置解决方案中实时增量任务的运行参数情况。
其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。
配置项 | 说明 |
---|---|
*实时集成任务资源组 | 下拉选择 DataLeap 项目控制台中已绑定且处于健康状态的独享数据集成资源组。 |
*资源设置 | 可通过自定义和默认两种设置方式,进行实时任务运行资源的设定,如单TaskManager CPU数量、单TaskManager内存大小、JobManager CPU数量等。 说明 默认设置中,各运行资源设置如下:
|
集成高级参数设置 | 打开高级参数输入按钮,根据实际业务要求,以 Key/Value 形式,在编辑框中输入实时任务所需的高级参数。支持参数详见各数据源中的高级参数介绍。 |
Flink 运行参数设置 | 支持输入 Flink 相关的动态参数和执行参数,具体参数设置详见 Flink 官方文档。 |
调度设置
离线全量任务还需要任务调度资源组,来支持任务下发分配至独享数据集成资源组中运行,目前调度资源组支持选择公共调度资源组和独享调度资源组。
提交方案
方案运行配置完成后,单击右下角提交方案按钮,进行方案的提交,在弹窗中,您可根据实际情况勾选方案立即执行,并单击确定按钮,完成实时数据同步解决方案的创建。
说明
更多详细操作详见实时分库分表。
等待实时采集方案、实时分库分表方案执行完成后,您可前往 Doris 集群目的端数据库中,验证数据的实时接收情况。
登录到 Doris 集群后,执行以下语句,进入到 Doris 数据库中:
说明
Doris 数据库用户名密码信息和其余更多操作,详见 Doris 基础使用说明。
mysql -h 127.0.0.1 -P9030 -u root -p
执行以下语句,进入目的端数据库并查询 demo1 表数据:
select * from demo1;
可以在 Doris 表中看到历史的全量数据。
登录 MySQL 数据库,继续往数据库中插入以下数据:
insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (6, '数据治理', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157,-19505.969, '2023-03-27 14:24:22','KKKKKKKKKK'); insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (7, '数据地图', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157,-19505.969, '2023-03-27 14:24:22','KKKKKKKKKK');
MySQL 数据插入执行完成后,您便可在 Doris 数据库表中,查询到实时插入的数据:
实时数据查询验证无误后,后续您也可前往数据采集 > Topic管理 > 云监控界面,查看 Binlog 采集流量等监控信息。