You need to enable JavaScript to run this app.
导航
离线数据同步
最近更新时间:2024.12.11 15:20:15首次发布时间:2021.09.14 11:21:01

DataSail 全域数据集成提供离线集成数据同步能力,支持丰富的异构数据源之间,进行高速稳定的数据同步操作,将源端数据库中的数据,通过离线的方式,同步至目标数据库中,实现目标库和源库的数据保持对应。
本文将为您介绍离线数据同步的操作说明。

1 数据源

数据集成使用的数据源,需要先在项目控制台下注册。离线数据同步支持的数据源类型,请参见:支持的数据源
数据源需要在连通网络的前提下进行数据同步,相关说明请参见:配置网络连通
数据源的具体配置参数信息,请参见:配置数据源

2 创建离线数据集成

创建离线数据集成任务的步骤如下:

  1. 登录 DataLeap 租户控制台
  2. 在左侧导航栏,单击项目管理,进入项目列表界面。
  3. 单击相应的项目名称,进入到数据开发界面。
    Image
  4. 在数据开发界面,单击目录树上新建任务按钮,进入新建任务界面。
  5. 选择任务类型:
    1. 分类:数据集成
    2. 选择任务:离线集成
  6. 填写任务基本信息:
    1. 任务名称:输入任务的名称,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。
    2. 保存至: 选择任务存放的目标文件夹目录。
  7. 单击确定按钮,完成任务创建。

3 数据来源/目标配置

新建离线集成任务完成后,进入离线集成任务可视化配置界面,配置以下数据来源/目标项配置:

说明

  • 各数据源配置说明,详见数据源列表
  • 部分数据源写入时,需要选择数据写入方式。针对不同的数据源,有不同的写入方式。
  1. 按实际业务场景配置数据来源信息。
  2. 按需选择需写入目标数据源信息。

Image

4 字段映射配置

数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。
字段映射支持选择基础模式转换模式配置映射。

说明

基础模式和转换模式不支持互相切换,模式切换后,将清空现有字段映射中所有配置信息,一旦切换无法撤销,需谨慎操作。

4.1 转换模式

字段映射支持数据转换,您可根据实际业务需求进行配置,将源端采集的数据,事先对其进行各种数据转换操作后,以指定格式输入到目标端数据库中,满足不同业务场景需求。
转换模式可应用于各种轻量级数据处理场景,如:

  • 数据清洗:可以使用转换模式过滤冗余数据、处理缺失值、纠正数据错误等场景。
  • 数据预处理:可以使用转换模式对数据进行标准化、归一化、离散化等预处理,以便更好地分析和处理数据。
  • 数据转换:可以使用转换模式将数据从一种格式转换为另一种格式。

注意

目前转换模式支持所有的离线集成作业,以及 MQ(例如 Kafka)到 Hive/HDFS 的实时集成作业。

您可通过以下方式进入转换模式配置界面:

  1. 参考上方步骤2 创建离线数据集成,进入可视化任务配置界面。
  2. 按需配置数据来源和目标端数据源信息。
  3. 单击“转换模式”按钮,进入转换模式配置界面。

Image
在转换模式中,你需依次配置:来源节点、数据转换、目标节点信息。

4.1.1 配置来源节点

默认情况下,来源节点目标节点会自动添加到节点目录列表中,如下图所示。

  1. 单击来源节点下的 Source 节点,右侧进行数据来源的字段信息配置。
  2. 配置来源节点名称信息,您可自定义输入来源节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
  3. 配置数据字段信息,您可通过自动添加、手动添加两种模式来添加数据来源字段信息。
  4. 来源节点信息配置完成后,单击确认按钮,此时节点目录列表中的来源节点名称左侧灰点(·),会变成绿点(·),则表示来源节点配置完成。

4.1.2 添加转换节点

数据转换节点支持添加 SQL 转换节点,支持通过 Flink SQL 函数来实现多种转换操作,例如删除列、重命名字段、数据类型转换等。添加 SQL 转换节点操作如下:

  1. 单击数据转换节点右侧添加按钮,选择 SQL 转换方式,配置转换信息和规则。

  2. 配置转换节点名称,您可自定义输入节点转换名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。

  3. 输入 SQL 脚本转换规则,目前仅支持添加一个转换的 SQL 语句,且不能包括 “;”。
    以下是 Flink SQL 相关的转换示例(假设源表名为 Source):

    • 字段重组 / 拆分 / 合并

      -- 字段重组: 源表有 a, b, c 三个字段,但下游目的表只有 c, b 两个字段
      SELECT c, b FROM Source
      
      -- 字段合并: 把源表的 first_name 字段和 last_name 字段用空格连接起来,并作为一个新的字段 full_name
      SELECT first_name || ' ' || last_name AS full_name FROM Source
      
      -- 字段拆分: 把源表的 full_name 字段根据空格做拆分,并产生 first_name 和 last_name 两个新字段
      SELECT SPLIT_INDEX(full_name, ' ', 1) AS first_name, SPLIT_INDEX(full_name, ' ', 2) AS last_name FROM Source
      
    • 时间格式转换

      -- 10 位 Unix 时间戳 字符串转换成 日期和日期时间字符串(年月日、年月日时分秒)
      SELECT id, `name`, FROM_UNIXTIME(CAST(t_time AS BIGINT),'yyyy-MM-dd') as t_time FROM Source
      SELECT id, `name`, FROM_UNIXTIME(CAST(t_time AS BIGINT),'yyyy-MM-dd HH:mm:ss') as t_time FROM Source
      
      -- 13 位 Unix 时间戳 字符串转换成 日期和日期时间字符串(年月日、年月日时分秒)
      SELECT id, `name`, FROM_UNIXTIME(CAST(t_time AS BIGINT)/1000,'yyyy-MM-dd') as t_time FROM Source
      
      SELECT id, `name`, FROM_UNIXTIME(CAST(t_time AS BIGINT)/1000,'yyyy-MM-dd HH:mm:ss') as t_time FROM Source
      
      -- 当前日期转换成 yyyy-mm-dd
      SELECT id, `name`, DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd') as t_time FROM Source
      
      SELECT  CONVERT_TZ(REPLACE(acceptTime, 'T', ' '), 'UTC', 'UTC') AS acceptTime  FROM Source
      
    • 条件判断

      -- 判断 name 字段是否仅包含空白字符, 如果是的话就输出 NULL, 否则输出原字符串
      SELECT
          CASE
               WHEN IS_SPACES(name) THEN NULL
               ELSE name
          END
      FROM Source
      
      -- 按条件过滤数据
      SELECT name, age FROM Source WHERE age > 10
      
    • 字段截取

      SELECT id, o_name, SUBSTRING(tb, 2) FROM Source
      
    • 条件过滤

      SELECT id, o_name FROM Source where id>100 and name='test'
      

    更多 SQL 节点支持的转换函数列表,详见Flink 1.11 的所有标量函数
    以下为 DataSail 内部特有的一些函数说明:

    函数名

    用途

    返回值类型

    示例

    IS_NUMBER

    判断一个字符串是否是数字格式 。

    Boolean

    IS_NUMBER(price),如果 price 是 '1''2.3' 等数字格式的字符串,则返回 true

    IS_SPACES

    判断一个字符串是否全由空格字符组成。

    Boolean

    IS_SPACES(name),如果 name 是 ' ' 等空格字符组成的字符串,则返回 true

    GET_JSON_OBJECT
    GET_JSON_STRING(只返回 String)

    按照给定的 JSONPath 路径,从给定 JSON 字符串中提取相应的字段值。
    如果获取不到给定的 JSONPath 路径,则返回第三个参数指定的默认值。

    参数的类型

    GET_JSON_OBJECT(json_field, '$.b', 'None')

    • 如果 json_field 是 '{"a":1,"b":"hello"}" 则返回 'hello'
    • 如果 json_field 没有 b 字段,则返回给定的默认值 'None'

    注意

    请务必设置非 null 的默认值,因为函数的输出类型将由该字段的类型来定义。如果确实需要 null 值作为默认值,请配合 CASE WHEN 语句使用。

    CREATE_LIST

    根据指定的参数,创建一个 List 对象。
    通常用于构造 GET_JSON_OBJECT 的第三个参数(默认值),以便该函数返回 List 类型的结果。

    List<参数的类型>

    • 空列表:CREATE_LIST()
    • 整数列表:CREATE_LIST(1, 2, 3)
    • 字符串列表:CREATE_LIST('a', 'b', 'c')

    TO_JSON

    将一个 Array(List)或 Map 等复杂对象转为 JSON 字符串

    String

    • MAP 字段转为 JSON 字符串:TO_JSON(map_field)
    • LIST 字段转为 JSON 字符串:TO_JSON(list_field)

    SUBSTRING

    截取字符串

    String

    SUBSTRING(string_field FROM 2 FOR 4)

    SUBSTR

    截取字符串

    String

    SUBSTR(string_field, 2, 4)

    AES_ENCRYPT

    AES 加密函数

    String

    AES_ENCRYPT(string_field, key)
    key: 是 base64(密码) ,密码长度需要 32 字节

  4. SQL 输入完成后,您也可单击输入框上方格式化按钮,对输入的 SQL 进行格式化操作。

  5. 转换信息配置完成后,单击确认按钮,在通过检查后,节点目录列表中的转换节点名称左侧灰点(·),也会变成绿点(·)表示转换节点配置完成;若未通过检查,则会有报错提示,需根据信息修改参数。

Image

4.1.3 配置目标节点

与来源节点类似,配置目标节点 Sink 信息:

  1. 单击目标节点下的 Sink 节点,右侧进行数据目标的字段信息配置。
  2. 配置目标节点名称信息,您可自定义输入目标节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
  3. 配置目标数据字段信息,您可通过自动添加、手动添加两种模式来添加数据来源字段信息。

    说明

    目标节点的字段名称应当与上游 SQL 节点的输出字段保持一致,避免运行时异常。
    例如上游是 SQL 转换节点,SQL 语句为 SELECT name || '-SUFFIX' AS new_name, address FROM Source,那么目标节点的字段需要为 new_nameaddress

  4. 目标节点信息配置完成后,单击确认按钮,确认节点名称左侧变成绿点(·)后,表示目标节点配置完成。

4.1.4 执行转换调试

数据转换节点配置完成后,当所有节点左侧都变成绿点,则表明可以将任务提交执行了。对于离线集成作业,可以单击上方操作栏中的调试按钮开始执行。
Image

注意

  • 若后续对数据转换节点做任何变更(包括重命名、修改参数、删除节点等操作),均会导致当前节点和下游所有节点的状态,由绿色变成灰色。此时,您需要依次单击每个节点中的确认按钮,确认所有节点重新变成绿色后,方可继续提交执行任务。否则将会有如下报错信息提示:
    Image
  • 对于 SQL 节点,我们建议只使用 Flink 1.11 中的标量函数来完成轻量级数据转换,请避免使用 JOIN、GROUP BY、DISTINCT、WINDOW 等带状态的语句,以免造成作业运行缓慢甚至崩溃。
  • SQL 节点当前仅能添加一个。

4.2 基础模式

基础模式您可通过以下几种方式操作字段映射关系:

  • 单击自动添加,可以根据数据源 Schema 自动添加字段。部分数据源类型支持获取schema自动添加字段。
  • 单击手动添加,可以添加一条空白的字段信息,手工输入字段名、类型等。
  • 单击同名映射,可根据目标端/源端的字段添加情况,进行同名映射添加源端/目标端的字段信息。

    说明

    同名映射能力,仅部分数据源支持,如 FTP/SFTP 数据源。

  • 单击删除全部,可以删除全部字段信息。
  • 通过拖拽字段左侧的按钮,可以调整字段顺序。

注意

请注意列与列之间映射的字段类型是否数据兼容。

Image

5 任务运行参数

字段映射配置完成后,您可按需设置以下任务运行参数:

参数

说明

期望最大并发数

数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。

脏数据设置

任务中字段映射没有匹配到的数据,如格式非法,或源端数据进入目标端时发生了异常。例如:源端是 String 类型的数据写到 INT 类型的目标字段中,因为类型转换不合理而无法写入的数据。
您可以在同步任务配置时,设置脏数据的最大容忍条数/比例。如果配置0,表示不允许脏数据存在,任务会运行失败退出。

自定义参数设置

  • 单行编辑模式,需要设置参数 Key和value 值;
  • 脚本编辑模式,需要输入json串参数。

各数据源常见高级参数使用详见高级参数

Image

6 脚本模式配置

  1. 在可视化任务编辑界面,单击上方工具栏切换至脚本模式按钮,进入编辑界面。

    注意

    切换脚本模式将清空现有可视化界面配置,一旦切换无法撤销。

    Image
  2. 首次使用 DSL 模式配置时,您可通过单击界面导入脚本模板按钮,在模板的基础上,进行相应配置的修改,提升任务配置效率。各数据源脚本配置说明,详见数据源配置
    Image

    注意

    导入新的脚本模版将清空现有内容,一旦导入无法撤销。

7 调度属性/集成资源组配置

任务执行逻辑配置完成后,您可进行后续的调度属性和数据集成资源组相关配置:

  • 配置调度属性:
    1. 单击右侧的调度设置 ,进入该任务的调度设置页面。
    2. 可以设置离线集成任务的运行周期、运行时间、调度依赖等属性。 更多说明请参见:任务调度设置
  • 配置数据集成资源组
    1. 单击右侧的数据集成资源组 ,进入数据集成资源组页面。
    2. 下拉选择在项目控制台中,已完成资源组连通性测试的数据集成资源组,保障任务网络能够连通。

      说明

      下拉展现的数据集成资源组,您需先在项目控制台 > 配置信息 > 服务绑定中,进行数据集成资源组的绑定操作。详见4 独享集成资源组使用

8 调试任务

任务代码逻辑和参数配置完成后,您可在编辑器上方,单击操作栏中的保存调试按钮,进行任务调试。

注意

调试操作,直接使用线上数据进行调试,需谨慎操作。

8.1 调试记录

调试任务开始运行后,可在下方查看调试记录,单击调试记录按钮,可以查看以下详情内容:
Image

  • 概览:查看任务执行的状态、业务日期、运行时长、开始时间等任务概览信息。
  • 监控:离线集成任务支持查看任务读写监控指标,您可单击数据集成监控按钮,前往云监控界面查看集成读写指标、资源组使用率等监控信息。
  • 调度日志:查看任务在调度分发阶段时的日志信息。
  • 执行日志:查看任务实际在独享集成资源组中运行时的执行日志详情信息,您可从中查看任务实际读取的数据大小、数据条数、或失败原因等信息。执行日志查看详见DataSail 日志查看和诊断
  • 运行事件:查看任务运行过程中的事件信息,如任务状态变化、任务失败、重启等事件信息。

9 提交任务

任务调试成功且结果确认无误后,单击上方操作栏中的保存提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:数据开发概述---离线任务提交
后续任务运维操作详见:离线任务运维