You need to enable JavaScript to run this app.
导航
Kafka_to_StarRocks 通道实践
最近更新时间:2024.09.20 17:47:41首次发布时间:2024.09.20 17:39:24

DataSail 全域数据集成流式集成任务根据源端 Kafka 数据源指定 Topic 的形式,将数据实时同步至火山引擎 E-MapReduce(EMR)StarRocks 目的端中。
本文为您介绍如何创建 Kafka 实时同步数据至 StarRocks 中。

1 前置准备

  1. 已创建消息队列 Kafak 实例和 Topic 消息集合。操作详见创建资源
  2. 已创建 EMR StarRocks 集群实例。详见创建集群
  3. 已创建合适资源规格的独享数据集成资源组,并将其绑定至创建成功的 DataLeap 项目下。购买操作详见资源组管理,项目绑定操作详见数据集成资源组

2 创建数据源

2.1 Kafka 数据源配置(源端)

配置流式集成作业时,您需在数据源管理界面中,配置来源端 Kafka 数据源。详见 Kafka 数据源配置

2.2 StarRocks 数据源配置(目标端)

配置流式集成作业时,您需在数据源管理界面中,配置目标端 StarRocks 数据源。详见 StarRocks 数据源配置

3 新建流式集成任务

创建流式数据集成任务的步骤如下:

  1. 登录 DataLeap 租户控制台
  2. 在左侧导航栏,单击项目管理,进入项目列表界面。
  3. 单击相应的项目名称,进入到数据开发界面。
    图片
  4. 在数据开发界面,单击目录树上新建任务按钮,进入新建任务界面。
  5. 选择任务类型:
    1. 分类:数据集成
    2. 选择任务:流式集成
  6. 填写任务基本信息:
    1. 任务名称:输入任务的名称,只允许字母、数字、下划线和连字符,且需要在63个字符以内。如:stream_0101-test
    2. 保存至: 选择任务存放的目标文件夹目录。
      图片
  7. 单击确定按钮,完成任务创建。

4 配置流式集成任务

4.1 选择数据源

  1. 来源配置
    数据来源选择 Kafka,并完成以下相关参数配置:
    其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

    参数

    说明

    *数据源类型

    数据来源类型选择 Kafka。

    *数据源名称

    已在数据源管理界面注册的 Kafka 数据源,下拉可选。
    若还未建立相应数据源,可单击数据源管理按钮,前往创建 Kafka 数据源。

    *Topic 名称

    选择 Kafka 处理消息源的不同分类主题名称,下拉可选数据源下对应需读取数据的 Topic 名称,支持同时选择多个结构相同的 Topic。

    *数据类型

    支持 JSON、Pb、HBASE WAL,下拉可选,默认为JSON格式。
    本实践中,以 JSON 格式为例。

    示例数据

    填写一串完整的 JSON 串,需以 json 字符串形式描述 schema,支持多层级结构数据提取,方便字段映射时自动解析源端字段信息。如:

    {
        "id": 1839707,
         "name": "吕*",
         "address": "内蒙古自治区建市黄浦刘街D座 193097",
         "create_time": 1663574557,
         "event_time": 1663574257,
         "price": -76048.3231833319,
         "date_info": "2022-09-19",
         "list_info": [
            "qmiadtblRJhmisuCfdUq",
            "KbjSvEfrndxLxIgOYdTf",
            "xcxLhQWMNLZzCmZRTRCT",
            "wapgRRlsUpveJtmgVSPl",
            "qcIzMcNUlYxfkldFMitD"
              ]
    }
    
  2. 目标端配置
    流式数据目标端选择 StarRocks,并完成以下相关参数配置:
    其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

    参数

    说明

    *目标类型

    数据去向目标类型选择 StarRocks。

    *数据源名称

    已在数据源管理界面注册的 StarRocks 数据源,下拉可选。
    若还未建立相应数据源,可单击数据源管理按钮,前往创建 StarRocks 数据源。

    *数据表

    数据源下所属需数据写入的表名,下拉可选。
    若目标 StarRocks 表还未创建,且源端已选定需要采集的表或 Topic,并添加源端字段映射完成后,您可通过单击“一键建表”按钮,依据对源端采集的字段信息,快速进行目标表的创建。

    说明

    在一键建表的弹窗界面中,您可根据实际情况修改新建表的 DDL 语句,如表名、字段名、字段描述等信息。您也可对建表语句进行格式化复制编辑操作。

    *分区类型

    目标表选择分区表时,您可选择将源端数据写入动态分区类型:

    • 动态分区:即根据源端字段值内容,进行动态写入,动态分区值必须存在于源端 Columns 中,选择动态分区类型后,便可自动将表分区字段,加入到下方字段映射中。

4.2 字段映射

数据来源和目标表配置完成后,您可进行字段映射配置。

  • 您可单击自动添加按钮,一键添加来源和目标端的字段映射关系;
  • 若您的场景中,字段存在比较复杂的关系,如需要将源端字符串形式的时间转换为日期格式时,您便可选择转换模式,并配置来源节点、数据转换节点、目标节点等。
    • 来源节点:
      单击自动添加按钮,添加源端字段信息,并单击确认按钮,完成来源节点配置。
      图片

    • 数据转换:
      单击添加 SQL 转换按钮,并在编辑框中输入转换 SQL,单击右下角确认按钮,完成数据转换节点配置。
      图片
      转换语句支持 Flink SQL 语法,当前支持 1.11 版本,语法参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.11/
      转换函数示例说明:

      • 10 位时间戳 字符串转换成 日期和日期时间字符串

        SELECT id ,`name`,FROM_UNIXTIME(CAST(t_time AS BIGINT),'yyyy-MM-dd') as t_time FROM Source
        
        Trans0 SELECT id ,`name`,FROM_UNIXTIME(CAST(t_time AS BIGINT),'yyyy-MM-dd HH:mm:ss') as t_time FROM Source
        
      • 13 位时间戳 字符串转换成 日期和日期时间字符串

        Trans0 SELECT id ,`name`,FROM_UNIXTIME(CAST(t_time AS BIGINT)/1000,'yyyy-MM-dd') as t_time FROM Source
        
        Trans0 SELECT id ,`name`,FROM_UNIXTIME(CAST(t_time AS BIGINT)/1000,'yyyy-MM-dd HH:mm:ss') as t_time FROM Source
        

      转换模式更多操作详见4.1 转换模式

  • 目标节点:
    单击自动添加按钮,添加目标端字段信息,并单击确认按钮,完成目标节点配置。

4.3 任务运行参数

字段映射配置完成后,您可继续设置是否开启归档、默认消费起始、高级参数等任务运行参数。
图片

4.4 参数设置

单击右侧参数设置按钮,进入设置流式任务运行资源相关信息。
图片

4.5 选择数据集成资源组

数据集成资源组窗口中,下拉选择在2 创建数据源时,数据源测试连通性成功的集成资源组信息。
图片

4.6 选择镜像版本

在右侧镜像版本入口,下拉选择当前最新的流式镜像版本信息。
图片

5 提交上线

任务所需参数配置完成后,将任务提交发布到运维中心实时任务运维中执行。 单击操作栏中的保存提交上线按钮,在弹窗中,需先通过提交上线流程,最后单击确认按钮,完成作业提交。
图片
后续任务运维操作详见:实时任务运维