You need to enable JavaScript to run this app.
导航
Kafka 流式写入 TOS 对象存储
最近更新时间:2025.03.28 15:32:05首次发布时间:2023.12.26 14:03:37
我的收藏
有用
有用
无用
无用

Flink 是一个兼容 Apache Flink 的全托管流式计算平台,支持对海量实时数据的高效处理。而 火山引擎对象存储 TOS(Torch Object Storage)是火山引擎提供的海量、安全、低成本、易用、高可靠、高可用的分布式云存储服务。通过 Flink 内置的 Filesytem Connector,您可以轻松访问和管理火山引擎 TOS 上的数据。

场景介绍

本文模拟场景主要实现:读取消息队列 Kafka 数据写入对象存储 TOS。
在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路。

注意事项

  • 通过 Flink 任务往 TOS 写入文件时,使用 filesystem 连接器。为确保数据的一致性和容错性,需要在 Flink 参数配置中开启 Checkpoint
    如果不启用 Checkpoint,TOS Bucket 中只会写入临时文件,此时将无法映射数据到外表。
  • 为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 Flink 资源池、Kafka 实例、TOS Bucket 均处于相同地域、相同 VPC。

前提条件

  1. 登录流式计算 Flink 版控制台
  2. 在顶部菜单栏选择目标地域,然后从项目管理页面进入项目。
  3. 创建 Flink SQL 任务。
    1. 在项目左侧导航栏选择作业开发 > 作业开发,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink SQL 作业

Image

  1. 创建任务对话框,设置任务名称、类型(Flink SQL + 流式)、文件夹和引擎版本,然后单击确定

Image

| | | \
  |**配置** |**说明** |
  |---|---|
  | | | \
  |任务名称 |自定义设置任务的名称,如“datagen-kafka-tos”。 |\
  | |名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。 |
  | | | \
  |任务类型 |选择 作业类型 > Flink SQL > 流式。 |
  | | | \
  |所属文件夹 |系统提供文件夹管理功能,用于分类管理任务。 |\
  | |您可以直接选择系统默认存在的**数据开发文件夹**,也可以使用自定义创建的文件夹。 |
  | | | \
  |引擎版本 |按需选择引擎版本,本文选择引擎版本为 **Flink 1.16-volcano** 版本。 |
  | | | \
  |任务描述 |输入任务的描述语句,一般描述任务实现的功能。 |
  1. 在任务编辑区编写 SQL 任务的业务逻辑代码。
    示例代码含义为:将 Datagen 连接器实时生成的随机数写入 Kafka Topic 中;然后读取 Kafka Topic 数据并输出到 TOS Bucket。

    注意

    • 往 TOS 写入文件时,使用 filesystem 连接器。如果需要尽快在 TOS Bucket 中看到写入的文件和保证数据一致性,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数配置中开启 Checkpoint
      如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy
    • 一个任务中,如果存在一个表同时作为 source 和 sink,建议您直接验证 SQL 正确性,确保无误后可直接在线上进行测试。如果执行调试操作,可能会出现类似Table:xxx should not be both source and sink.的报错信息。
    create table orders (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time as localtimestamp
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='1',
     'fields.order_status.length' = '3',
     'fields.order_id.min' = '1',
     'fields.order_id.max' = '10000',
     'fields.order_product_id.min' = '1',
     'fields.order_product_id.max' = '100',
     'fields.order_customer_id.min' = '1',
     'fields.order_customer_id.max' = '1000'
    );
    
    create table kafka_table (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp
    ) WITH (
        'connector' = 'kafka', 
        --安全协议设置为SASL_PLAINTEXT。
        'properties.security.protocol' = 'SASL_PLAINTEXT',  
        --SASL 机制为  PLAIN。
        'properties.sasl.mechanism' = 'PLAIN', 
        -- 配置JAAS。
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="doc-user" password="qaP***6";',
        --Kafka实例的SASL_PLAINTEXT接入点。
        'properties.bootstrap.servers' = 'kafka-***hd8md.kafka.ivolces.com:9093',
        --Group和Topic。
        'topic' = 'topic-b', 
        'properties.group.id' = 'group-b', 
        --读取数据的启动模式,“earliest-offset”表示从最早分区开始读取。
        'scan.startup.mode' = 'earliest-offset',
        --定期扫描并发现新的Topic和Partition的时间间隔。
        'scan.topic-partition-discovery.interval' = '120s', 
        'format' = 'json',
        --关闭幂等性。
        'properties.enable.idempotence' = 'false'
    );
    
    
    insert into kafka_table 
    select * from orders;
    
    
    CREATE TABLE tos_sink (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`)
    WITH (
      'connector' = 'filesystem',   --访问 TOS 资源时使用 filesystem 连接器。
      'path' = 'tos://wuch-doc-tos/db/table_1',  --tos 路径,由 Bucket 名称和文件夹名称组成,文件夹名称建议为 DB 和 Table 名称。
      'sink.rolling-policy.file-size' = '1M',  --文件内存最大限制,达到该值关闭文件并新打开一个文件写入。
      'sink.rolling-policy.rollover-interval' = '5 min',  --文件持续写入时间,达到该值关闭文件并打开一个新文件写入。
      'format' = 'parquet'
    );
    
    
    insert into tos_sink
    select   
      order_id,
      order_product_id,
      order_customer_id,
      order_status,
      order_update_time,
      DATE_FORMAT (order_update_time, 'yyyy-MM-dd') as dt,
      DATE_FORMAT (order_update_time, 'HH') as `hour` 
      from kafka_table;
    
    SQL
  2. 在代码编辑区上方,单击验证按钮。
    系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
    Image

  3. 启用 Checkpoint。
    在代码编辑区上方,单击参数配置,然后开启 Checkpoint。
    Image

  4. 上线任务。

    1. 设置执行方式引擎版本,然后单击上线
      本文场景中执行方式设置为 STREAMING,引擎版本设置为 Flink 1.16-volcano
      Image

    2. 任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定
      Image

      配置

      说明

      运行资源池

      从下拉列表中选择任务运行的 Flink 资源池。

      任务优先级

      系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。
      任务优先级决定了任务内部的调度顺序,优先级高的任务先被调度,即 L3 先于 L4 被调度。

      调度策略

      根据需求配置任务调度策略:

      • GANG:保证任务的所有实例被一起调度,即当剩余资源满足任务正常运行所需资源时才进行分配;不满足所需资源则不分配。
        该策略不会出现分配资源后,任务却不能启动的现象,解决了资源死锁问题。
      • DRF:从多维资源考虑,更为合理地将资源公平分配给资源池内的各个任务,从而提升利用率。
        例如:剩余 10 核 40 GB 的资源,A 任务需要10 核 20 GB 资源;B 任务需要 2 核 8 GB 的资源。如果分配给 A,剩余 0 核 20 GB 资源无法被利用;DRF 策略会选择分配给 B,剩下 8 核 32 GB 可以继续给后来任务使用。

      调度时长

      设置为 GANG 调度策略时,需要设置调度时长。
      如果超过调度时长,任务就会调度失败。如果设置为 0,则会一直重试。

  5. 启动任务。

    1. 在项目左侧导航栏选择任务运维 > 任务管理,然后单击目标任务后方的启动按钮。
      Image
    2. 启动任务对话框,选择全新启动,然后单击确定
      Image
      任务启动需要一定时长,请耐心等待,启动成功后状态为运行中。如果您想了解其他启动方式和启动时的参数配置,请参见启动任务

在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路。您可以通过以下三种方式验证任务结果:

  • 方式一:查看 Flink UI
  • 方式二:查询 Kafka Topic 消息
  • 方式三:查看 TOS Bucket 文件

Flink SQL 任务正常运行后,您可以进入 Flink UI 页面,查看任务运行情况。

  1. 在 Flink 控制台左侧导航栏选择任务运维 >任务管理。,然后单击单击目标任务后方的 Flink UI。
    Image
  2. 浏览器将会自动打开 Flink UI 页面,可查看任务详情。

方式二:查询 Kafka Topic 消息

Flink SQL 任务正常运行后,您可以在 Kafka 控制台查看目标 Topic 中的数据。

  1. 登录消息队列 Kafka 控制台
  2. 实例列表页面单击实例名称,进入 Kafka 实例详情页面。
  3. 消息查询页签下,选择按位点查询按时间查询,查询 Topic 中的数据。
    如需了解 Kafka Topic 消息查询的详细信息,请参见消息查询
    Image

方式三:查看 TOS Bucket 文件

Flink SQL 任务正常运行后,您可以在 TOS Bucket 目标路径下查看文件。

  1. 登录对象存储控制台
  2. 桶列表页面,单击目标存储桶,进入存储通详情页面。
  3. 在存储桶的文件列表页面,进入目标文件夹,查看写入的文件详情。
    Image