You need to enable JavaScript to run this app.
导航
采集 MySQL Binlog 到数据集成托管消息队列
最近更新时间:2024.11.21 14:03:43首次发布时间:2024.03.11 15:21:52

实时数据采集方案支持您将源数据库的增量数据实时采集至消息队列。该方案场景适用于您将源端增量数据同步至目标端数据源库表中,且增量数据较大或者存在波峰波谷时,建议您先进行数据缓存的中间步骤。本文为您介绍如何创建采集解决方案将 RDS MySQL 实例的 Binlog 采集到集成托管消息队列(即 DataSail 数据源),并在数据同步方案中使用缓存方案,实现增量数据实时同步到下游 Doris 数据表。

1 前置操作

2 注意事项

  • 目标表创建:将源端表数据写入到目标表中,目标表创建可以是以下两种方式:
    • 使用已有表:已经手动在数据采集-Topic管理中创建好 Topic。采集方案步骤执行时,将跳过创建 Topic 的流程。建议手动进行 Topic 的创建,这样可以更加灵活地配置 Topic 的分区数和生命周期。

      注意

      手动创建 Topic 时,选择采集类型为“数据库采集”。操作详见 Topic 管理

    • 自动建表:若在数据采集-Topic管理中还没有目标 Topic,此时采集方案步骤执行时,会自动在流程中创建同名的目标 Topic。
  • 采集任务位点初始化:采集方案执行时,默认情况下会从最新的 Binlog 位点开始采集。您也可以重置点位,选择为 MySQL 实例中存在的任意 Binlog 位点。通常情况下我们会进行一次数据库表的全量同步,在此之前我们只需要从最新 Binlog 位点采集即可。
  • 在分库分表场景中,我们通常需要同时采集多个 MySQL 实例的 Binlog 数据。一个采集解决方案不建议配置采集太多的数据源,一般建议同时采集4个左右数据源是比较合适的。您也可以根据 MySQL 实例的 Binlog 产生速度进行采集数据源数量的调整。

3 操作步骤

4 操作流程

4.1 准备模拟数据

源端 MySQL 数据库中,创建以下表结构,并插入 5 条示例数据:

  • 创建 MySQL 表:

    CREATE TABLE `demo1`
    (
        `Id`             bigint(20) unsigned NOT NULL AUTO_INCREMENT,
        `Name`           char(10)                     DEFAULT NULL,
        `Address`        varchar(400)                 DEFAULT NULL,
        `Event_Time`     bigint(20)                   DEFAULT NULL,
        `Price`          double                       DEFAULT NULL,
        `Num_Big`        bigint(20)                   DEFAULT NULL,
        `Num_Float`      float(13, 3)                 DEFAULT NULL,
        `Datetime_Info`  datetime                     DEFAULT NULL,
        `Timestamp_Info` timestamp           NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
        `Tinytext_Info`  tinytext,
        PRIMARY KEY (`Id`)
    ) ENGINE = InnoDB
      AUTO_INCREMENT = 456014264
      DEFAULT CHARSET = utf8;
    
  • MySQL 中插入模拟示例数据:

    insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Timestamp_Info, Tinytext_Info) values (1, 'dataleap', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', '2023-03-27 14:24:22', 'KKKKKKKKKK');
    
    insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (2, '数据开发', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', 'KKKKKKKKKK');
    
    insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (3, '数据集成', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', 'KKKKKKKKKK');
    
    insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (4, '数据运维', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', 'KKKKKKKKKK');
    
    insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (5, '数据监控', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157, -19505.969, '2023-03-27 14:24:22', 'KKKKKKKKKK');
    

4.2 新建实时数据采集方案

前置操作中,源端、目标端数据源配置操作准备完成后,您可以开始进行实时数据采集方案配置:

  1. 登录 DataSail 控制台

  2. 在左侧导航栏中选择数据同步方案,进入同步方案配置界面。

  3. 单击目录树中项目选择入口,选择已创建的 DataLeap 项目。

  4. 单击右上角新建数据同步解决方案按钮,下拉选择实时数据采集按钮,进入实时数据采集方案配置界面

  5. 进入配置界面后,您可按实际场景需求,完成方案的基本配置、数据来源配置、数据目标配置、运行配置等流程配置。

  6. 基本配置参数说明如下表所示。其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。

    配置项

    说明

    基本信息

    *方案名称

    输入实时数据采集方案名称。只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需在127个字符以内。

    方案描述

    输入此方案的描述信息,方便后续维护管理。

    *保存至

    下拉选择方案保存路径,此路径为数据开发项目中的任务路径。创建方式详见任务目录树管理

    *链路类型

    下拉选择来源和目标端数据源类型。本实践来源选择 MySQL,目标选择 DataSail(内置 Topic)

    网络与资源配置

    *数据来源

    下拉选择数据源管理中创建成功的 MySQL 源端数据源名称,支持选择多个。

    *数据目标

    下拉选择数据源管理中创建成功的 DataSail 目标端数据源名称。

    *实时集成任务资源组

    下拉选择 DataLeap 项目控制台中已绑定的独享数据集成资源组:

    • 选择的资源组,需要确保能与源端、目标端数据源连通,您可单击下方的立即连通性测试按钮,进入测试窗口,单击连通性测试按钮,测试选择的资源组是否可以和两侧数据源连通成功。
    • 您也可以单击资源组管理按钮,前往资源组管理界面进行资源组的查看或新建等操作,详见资源组管理

    方案基本配置完成后,单击右下角下一步按钮,进行方案的数据来源配置。

  7. 在数据来源配置界面中,完成数据来源设置与库表映射规则匹配策略:
    其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。

    配置项

    说明

    数据源设置

    *数据源名称

    下拉选择数据源管理中创建成功的 MySQL 数据源。

    *订阅格式

    下拉选择 Debezium Json 类型的数据订阅格式。

    *源库、源表选择限定条件

    单击添加规则按钮,完成源库表的限定条件规则添加,支持添加多个规则,取合集,在本实践中:

    • 源库限定条件设置为 MySQL 数据库名,类型为个例;
    • 源表限定条件设置为 MySQL 数据表,类型为个例;

    Image

    单击获取源表按钮,平台根据源库、源表限定条件的设置,自动加载符合条件的表。

    映射规则

    *库表匹配策略

    选择自定义映射规则匹配策略,您可将目标表名,配置为已创建好的 Topic 名称;或设置为变量参数:“${table_name_src_transed}”,后续执行方案时,平台便将为您自动创建与源端 MySQL 同名的 Topic 名称。
    Image
    更多自定义配置示例详见9.3 自定义库表匹配策略示例

    数据来源配置完成后,单击右下角下一步按钮,进行方案的数据目标配置

  8. 在数据目标配置界面,单击“刷新逻辑表和目标表映射”,生成源表和目标 Topic 的采集关系。
    Image

  9. 数据目标配置完成后,单击下一步,进行方案运行配置,资源设置您可使用默认设置,也可通过自定义方式设置资源参数,本次实践中,资源设置参数如下:
    Image

  10. 运行配置设置完成后,您可单击下方提交方案按钮,并在弹窗中,勾选“立即执行”,并单击确定按钮,等待方案执行完成。

  11. 方案执行完成后,您可切换到数据采集 > Topic 管理界面中,查看对应 Topic 状态详情:
    Image

说明

实时数据采集更多详细操作详见实时数据采集方案

4.3 新建数据实时分库分表同步

实时数据采集方案创建完成后,您可实际场景,按需创建实时整库同步分库分表实时同步方案,将 MySQL Binlog 数据写入到 Doris 目标数据源中。本实践中,将以创建分库分表实时同步方案为例。

  1. 在左侧导航栏中选择数据同步方案,进入同步方案配置界面。

  2. 单击目录树中项目选择入口,选择已创建的 DataLeap 项目。

  3. 单击右上角新建数据同步解决方案按钮,下拉选择实时分库分表同步按钮,进入分库分表实时同步方案配置界面。

  4. 在基本配置界面,完成以下参数说明:
    其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。

    配置项

    说明

    基本信息

    *方案名称

    输入实时分库分表同步方案名称。只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需在127个字符以内。

    方案描述

    输入此方案的描述信息,方便后续维护管理。

    *保存至

    下拉选择方案保存路径,此路径为数据开发项目中的任务路径。创建方式详见任务目录树管理

    *链路类型

    下拉选择来源 MySQL 和目标端 Doris 数据源类型。

    网络与资源配置

    *数据来源

    下拉选择数据源管理中创建成功的 MySQL 源端数据源名称,支持选择多个。

    数据缓存

    选择同步解决方案是否需要通过数据缓存的方式来采集数据,这里勾选使用缓存, 需选择已创建成功的 DataSail(内置 Topic) 数据源缓存方式。

    *数据目标

    下拉选择数据源管理中创建成功的 Doris 目标端数据源名称。

    离线/实时集成任务资源组

    下拉选择在数据源管理界面中,创建来源、中间缓存、目标端数据源时,测试连通性能够成功的独享集成资源组。选择资源组后,您可进行以下操作:

    1. 单击下方的立即测试连通性按钮,进入连通性测试界面。
    2. 单击连通性测试按钮,完成离线全量、实时增量任务来源和目标端数据源的连通。

    若此前还未创建独享集成资源组,您也可以单击资源组管理按钮,前往资源组管理界面进行资源组的查看或新建等操作,详见资源组管理

    方案基本配置完成后,单击右下角下一步按钮,进行方案的数据来源配置。

  5. 在数据来源配置界面中,完成数据来源的选择,数据源库表名称支持通过正则匹配。
    其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。

    配置项

    说明

    数据源设置

    *数据源名称

    展现基本配置中选择的数据源,您也可以在此处再新增其余需要采集的数据源。

    *源库、源表选择限定条件

    单击添加规则按钮,完成源库表的限定条件规则添加,支持添加多个规则,取合集,在本实践中:

    • 源库限定条件设置为 MySQL 数据库名,类型为个例;
    • 源表限定条件设置为 MySQL 数据表,类型为个例;

    Image

    单击获取源表/获取源集合按钮,平台根据源库、源表/集合限定条件的设置,自动加载符合条件的表/集合。

    注意

    选择的多个库表,需保证其 Schema 信息一致(包括字段名称、字段类型)。
    数据字段模块内系统默认展示第一个数据源中第一张表的元数据字段信息,若多表间字段不一致可能会导致运行失败。

    映射规则

    *库表匹配策略

    选择自定义映射规则匹配策略,设置库表名的转换规则,在目标库表名框中输入相应已创建好的 Doris 库表名称信息。
    Image

    数据来源配置完成后,单击右下角下一步按钮,进行方案的数据缓存配置。

  6. 在数据缓存配置界面,同步方式选择“增加缓存”,且数据源来自“DataSail(内置 Topic)”类型,并完成以下映射配置:

    1. 单击刷新数据源和Topic映射按钮,根据数据源类型,进行相关的映射配置。
    2. 您可在下拉框中,选择已创建的来源、目标端数据源、Topic 名称和绑定采集解决方案信息。
      Image
    3. 缓存配置完成后,单击右下角下一步按钮,进行方案的数据目标配置。
  7. 在数据目标配置界面,单击“刷新逻辑表和目标表映射”,生成源表和目标表的映射关系,您可根据实际情况,设置目标表的分片键、分表键、是否全量同步等配置项。
    Image
    数据目标配置完成后,单击右下角下一步按钮,进行方案的运行配置。

  8. 在运行配置界面,您可根据实际情况设置以下信息:

    1. 离线全量同步
      其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。

      配置项

      说明

      *离线集成任务资源组

      下拉选择 DataLeap 项目控制台中已绑定且处于健康状态的独享数据集成资源组。

      *默认 Quota 数

      设置可同时提交执行的集成任务数量,可根据独享集成资源组规格进行配置,如资源组的大小为 40CU,则 Quota 配置需必须小于 20(40/2),否则会因资源问题导致任务执行时异常。

      *期望最大并发数

      设置离线任务同步时,可以从源端并行读取或并行写入目标端的最大线程数。
      并发数影响数据同步的效率,并发设置越高对应资源消耗也越多,由于资源原因或者任务本身特性等原因,实际执行时并发数可能小于等于设置的期望最大并发数。

      脏数据设置

      按需勾选设置任务执行过程中的脏数据处理方式。

      集成高级参数设置

      打开高级参数输入按钮,根据实际业务要求,以 Key/Value 形式,在编辑框中输入离线任务所需的高级参数。支持参数详见各数据源中的高级参数介绍

    2. 实时增量同步
      设置解决方案中实时增量任务的运行参数情况。
      其中名称前带 * 的参数为必填参数,名称前未带 * 的参数为可选参数。

      配置项

      说明

      *实时集成任务资源组

      下拉选择 DataLeap 项目控制台中已绑定且处于健康状态的独享数据集成资源组

      *资源设置

      可通过自定义和默认两种设置方式,进行实时任务运行资源的设定,如单TaskManager CPU数量、单TaskManager内存大小、JobManager CPU数量等。

      说明

      默认设置中,各运行资源设置如下:

      • 单 TaskManager CPU 数:2
      • 单 TaskManager 内存:4096 MB
      • 单 TaskManager slot 数:4
      • JobManager CPU 数:1
      • JobManager 内存:2048 MB

      集成高级参数设置

      打开高级参数输入按钮,根据实际业务要求,以 Key/Value 形式,在编辑框中输入实时任务所需的高级参数。支持参数详见各数据源中的高级参数介绍

      Flink 运行参数设置

      支持输入 Flink 相关的动态参数和执行参数,具体参数设置详见 Flink 官方文档

    3. 调度设置
      离线全量任务还需要任务调度资源组,来支持任务下发分配至独享数据集成资源组中运行,目前调度资源组支持选择公共调度资源组独享调度资源组

  9. 提交方案
    方案运行配置完成后,单击右下角提交方案按钮,进行方案的提交,在弹窗中,您可根据实际情况勾选方案立即执行,并单击确定按钮,完成实时数据同步解决方案的创建。

说明

更多详细操作详见实时分库分表

4.4 验证上下游数据同步

等待实时采集方案、实时分库分表方案执行完成后,您可前往 Doris 集群目的端数据库中,验证数据的实时接收情况。

  1. 前往 EMR 控制台,并登录相应的 Doris 集群。详见登录集群

  2. 登录到 Doris 集群后,执行以下语句,进入到 Doris 数据库中:

    说明

    Doris 数据库用户名密码信息和其余更多操作,详见 Doris 基础使用说明

    mysql -h 127.0.0.1 -P9030 -u root -p
    
  3. 执行以下语句,进入目的端数据库并查询 demo1 表数据:

    select * from demo1;
    

    Image
    可以在 Doris 表中看到历史的全量数据。

  4. 登录 MySQL 数据库,继续往数据库中插入以下数据:

    insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (6, '数据治理', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157,-19505.969, '2023-03-27 14:24:22','KKKKKKKKKK');
    
    insert into demo1 (Id, Name, Address, Event_Time, Price, Num_Big, Num_Float, Datetime_Info, Tinytext_Info) values (7, '数据地图', '六盘水市安次荆门路i座_282720', 1679897962, 87496.4557384283, 59534049432157,-19505.969, '2023-03-27 14:24:22','KKKKKKKKKK');
    
  5. MySQL 数据插入执行完成后,您便可在 Doris 数据库表中,查询到实时插入的数据:
    Image

  6. 实时数据查询验证无误后,后续您也可前往数据采集 > Topic管理 > 云监控界面,查看 Binlog 采集流量等监控信息。
    Image