You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Kafka 实时数据处理
Kafka 流式写入 TOS 对象存储
复制全文
Kafka 流式写入 TOS 对象存储

Flink 是一个兼容 Apache Flink 的全托管流式计算平台,支持对海量实时数据的高效处理。而 火山引擎对象存储 TOS(Torch Object Storage)是火山引擎提供的海量、安全、低成本、易用、高可靠、高可用的分布式云存储服务。通过 Flink 内置的 Filesytem Connector,您可以轻松访问和管理火山引擎 TOS 上的数据。

场景介绍

本文模拟场景主要实现:读取消息队列 Kafka 数据写入对象存储 TOS。
在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路。

注意事项

  • 通过 Flink 任务往 TOS 写入文件时,使用 filesystem 连接器。为确保数据的一致性和容错性,需要在 Flink 参数配置中开启 Checkpoint
    如果不启用 Checkpoint,TOS Bucket 中只会写入临时文件,此时将无法映射数据到外表。
  • 为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 Flink 资源池、Kafka 实例、TOS Bucket 均处于相同地域、相同 VPC。

前提条件

  1. 登录流式计算 Flink 版控制台
  2. 在顶部菜单栏选择目标地域,然后从项目管理页面进入项目。
  3. 创建 Flink SQL 任务。
    1. 在项目左侧导航栏选择作业开发 > 作业开发,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink SQL 作业

Image

  1. 创建任务对话框,设置任务名称、类型(Flink SQL + 流式)、文件夹和引擎版本,然后单击确定

Image

配置

说明

任务名称

自定义设置任务的名称,如“datagen-kafka-tos”。
名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

任务类型

选择 作业类型 > Flink SQL > 流式。

所属文件夹

系统提供文件夹管理功能,用于分类管理任务。
您可以直接选择系统默认存在的数据开发文件夹,也可以使用自定义创建的文件夹。

引擎版本

按需选择引擎版本,本文选择引擎版本为 Flink 1.16-volcano 版本。

任务描述

输入任务的描述语句,一般描述任务实现的功能。

  1. 在任务编辑区编写 SQL 任务的业务逻辑代码。
    示例代码含义为:将 Datagen 连接器实时生成的随机数写入 Kafka Topic 中;然后读取 Kafka Topic 数据并输出到 TOS Bucket。

    注意

    • 往 TOS 写入文件时,使用 filesystem 连接器。如果需要尽快在 TOS Bucket 中看到写入的文件和保证数据一致性,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数配置中开启 Checkpoint
      如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy
    • 一个任务中,如果存在一个表同时作为 source 和 sink,建议您直接验证 SQL 正确性,确保无误后可直接在线上进行测试。如果执行调试操作,可能会出现类似Table:xxx should not be both source and sink.的报错信息。
    create table orders (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time as localtimestamp
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='1',
     'fields.order_status.length' = '3',
     'fields.order_id.min' = '1',
     'fields.order_id.max' = '10000',
     'fields.order_product_id.min' = '1',
     'fields.order_product_id.max' = '100',
     'fields.order_customer_id.min' = '1',
     'fields.order_customer_id.max' = '1000'
    );
    
    create table kafka_table (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp
    ) WITH (
        'connector' = 'kafka', 
        --安全协议设置为SASL_PLAINTEXT。
        'properties.security.protocol' = 'SASL_PLAINTEXT',  
        --SASL 机制为  PLAIN。
        'properties.sasl.mechanism' = 'PLAIN', 
        -- 配置JAAS。
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="doc-user" password="qaP***6";',
        --Kafka实例的SASL_PLAINTEXT接入点。
        'properties.bootstrap.servers' = 'kafka-***hd8md.kafka.ivolces.com:9093',
        --Group和Topic。
        'topic' = 'topic-b', 
        'properties.group.id' = 'group-b', 
        --读取数据的启动模式,“earliest-offset”表示从最早分区开始读取。
        'scan.startup.mode' = 'earliest-offset',
        --定期扫描并发现新的Topic和Partition的时间间隔。
        'scan.topic-partition-discovery.interval' = '120s', 
        'format' = 'json',
        --关闭幂等性。
        'properties.enable.idempotence' = 'false'
    );
    
    
    insert into kafka_table 
    select * from orders;
    
    
    CREATE TABLE tos_sink (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`)
    WITH (
      'connector' = 'filesystem',   --访问 TOS 资源时使用 filesystem 连接器。
      'path' = 'tos://wuch-doc-tos/db/table_1',  --tos 路径,由 Bucket 名称和文件夹名称组成,文件夹名称建议为 DB 和 Table 名称。
      'sink.rolling-policy.file-size' = '1M',  --文件内存最大限制,达到该值关闭文件并新打开一个文件写入。
      'sink.rolling-policy.rollover-interval' = '5 min',  --文件持续写入时间,达到该值关闭文件并打开一个新文件写入。
      'format' = 'parquet'
    );
    
    
    insert into tos_sink
    select   
      order_id,
      order_product_id,
      order_customer_id,
      order_status,
      order_update_time,
      DATE_FORMAT (order_update_time, 'yyyy-MM-dd') as dt,
      DATE_FORMAT (order_update_time, 'HH') as `hour` 
      from kafka_table;
    
  2. 在代码编辑区上方,单击验证按钮。
    系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
    Image

  3. 启用 Checkpoint。
    在代码编辑区上方,单击参数配置,然后开启 Checkpoint。
    Image

  4. 上线任务。

    1. 设置执行方式引擎版本,然后单击上线
      本文场景中执行方式设置为 STREAMING,引擎版本设置为 Flink 1.16-volcano
      Image

    2. 任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定
      Image

      配置

      说明

      运行资源池

      从下拉列表中选择任务运行的 Flink 资源池。

      任务优先级

      系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。
      任务优先级决定了任务内部的调度顺序,优先级高的任务先被调度,即 L3 先于 L4 被调度。

      调度策略

      根据需求配置任务调度策略:

      • GANG:保证任务的所有实例被一起调度,即当剩余资源满足任务正常运行所需资源时才进行分配;不满足所需资源则不分配。
        该策略不会出现分配资源后,任务却不能启动的现象,解决了资源死锁问题。
      • DRF:从多维资源考虑,更为合理地将资源公平分配给资源池内的各个任务,从而提升利用率。
        例如:剩余 10 核 40 GB 的资源,A 任务需要10 核 20 GB 资源;B 任务需要 2 核 8 GB 的资源。如果分配给 A,剩余 0 核 20 GB 资源无法被利用;DRF 策略会选择分配给 B,剩下 8 核 32 GB 可以继续给后来任务使用。

      调度时长

      设置为 GANG 调度策略时,需要设置调度时长。
      如果超过调度时长,任务就会调度失败。如果设置为 0,则会一直重试。

  5. 启动任务。

    1. 在项目左侧导航栏选择任务运维 > 任务管理,然后单击目标任务后方的启动按钮。
      Image
    2. 启动任务对话框,选择全新启动,然后单击确定
      Image
      任务启动需要一定时长,请耐心等待,启动成功后状态为运行中。如果您想了解其他启动方式和启动时的参数配置,请参见启动任务

在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路。您可以通过以下三种方式验证任务结果:

  • 方式一:查看 Flink UI
  • 方式二:查询 Kafka Topic 消息
  • 方式三:查看 TOS Bucket 文件

Flink SQL 任务正常运行后,您可以进入 Flink UI 页面,查看任务运行情况。

  1. 在 Flink 控制台左侧导航栏选择任务运维 >任务管理。,然后单击单击目标任务后方的 Flink UI。
    Image
  2. 浏览器将会自动打开 Flink UI 页面,可查看任务详情。

方式二:查询 Kafka Topic 消息

Flink SQL 任务正常运行后,您可以在 Kafka 控制台查看目标 Topic 中的数据。

  1. 登录消息队列 Kafka 控制台
  2. 实例列表页面单击实例名称,进入 Kafka 实例详情页面。
  3. 消息查询页签下,选择按位点查询按时间查询,查询 Topic 中的数据。
    如需了解 Kafka Topic 消息查询的详细信息,请参见消息查询
    Image

方式三:查看 TOS Bucket 文件

Flink SQL 任务正常运行后,您可以在 TOS Bucket 目标路径下查看文件。

  1. 登录对象存储控制台
  2. 桶列表页面,单击目标存储桶,进入存储通详情页面。
  3. 在存储桶的文件列表页面,进入目标文件夹,查看写入的文件详情。
    Image
最近更新时间:2025.07.31 13:32:08
这个页面对您有帮助吗?
有用
有用
无用
无用