You need to enable JavaScript to run this app.
导航
读取云原生消息引擎 BMQ 数据写入对象存储 TOS
最近更新时间:2024.01.29 14:12:20首次发布时间:2023.04.17 17:40:13

本文介绍通过一个简单的 Flink SQL 任务,实现从 BMQ Topic 中读取实时数据,然后写入 TOS 中。

流程介绍

图片

  1. 准备数据源 BMQ Topic。
    您需要在云原生消息引擎控制台创建资源池、Topic 和 Consumer Group,并获取资源池接入点地址。
  2. 准备数据目的 TOS Bucket。
    您需要在对象存储控制台创建存储桶和文件夹。
  3. 开发 Flink SQL 任务。
    当您准备好数据源和数据目的后,便可以在流式计算 Flink 控制台开发 SQL 任务,通过简单的代码实现从 BMQ Topic 中读取数据写入到 TOS Bucket。

前提条件

为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 BMQ 资源池和 Flink 资源池均处于相同地域的同一个 VPC 内。
您可以在创建云资源前,先创建私有网络。相关文档,请参见创建私有网络创建子网

步骤一:准备数据源 BMQ Topic

  1. 登录云原生消息引擎控制台
  2. 在顶部菜单栏选择目标地域,然后从项目管理页面进入目标项目。
  3. 创建资源池。
    1. 在项目左侧导航栏选择资源管理,然后单击创建资源池

    2. 创建资源池页面,设置资源池的基本信息、资源配置、网络信息、Topic 配置等关键参数,然后单击下一步:确认订单
      图片

      一级配置项

      二级配置项

      说明

      基本信息

      资源类型

      默认为通用资源

      资源模式

      默认为 VCI 模式,即在通用资源-VCI 资源域上创建 BMQ 资源池。

      计费类型

      选择资源池计费的类型。目前仅支持按量计费

      资源池名称

      输入资源池名称。

      • 由小写字母、数字和短横线(-)构成。
      • 长度为 1~64 个字符。

      地域及可用区部署

      地域已选定,不可更改。
      根据业务的网络延迟、高可用容灾等需求,选择单可用区或者多可用区

      • 单可用区:选择一个合适的可用区即可。
      • 多可用区:默认选中多个可用区。

      所属项目

      从下拉列表中选择资源池所属项目。

      资源配置

      计算规格

      根据业务场景预估需要的 Topic 数量、Consumer Group 数量、分区数量等,选择适合的资源池规格。

      存储规格

      默认使用 CloudFS 加速存储,无需额外配置。

      网络信息

      私有网络

      为保证内网顺利访问,建议选择已有云上业务的地域位置所在的 VPC。同一个 VPC 内,不同可用区子网之间是互通的。
      如果还未创建私有网络,请参见创建私有网络

      子网

      从下拉列表中选择子网。系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。

      说明

      如果是多可用区部署的资源池,需要为选择的所有可用区分别配置子网。

      安全组

      从下拉列表中选择安全组。

      Topic 配置

      消息保留时长

      为该资源池下的所有 Topic 设置默认消息保留时长。

      • 默认为 72 小时,可按小时粒度调节。
      • 支持设置的留存范围为 1~336 小时(14天)。
    3. 订单详情页面,确认资源池配置信息,然后阅读并勾选产品相关协议,再单击立即购买
      提交购买订单后,您可以返回资源池页面。购买的资源池显示为初始化中,初始化完成后显示为运行中

  4. 获取接入点地址。
    1. 资源池管理页面,单击资源池名称,进入资源池详情页面。
    2. 在资源池详情页面的服务访问页签下,查看并复制资源池的用户接入点地址。
      图片
  5. 创建 Topic。
    1. 在资源池详情左侧导航栏选择 Topic,然后单击创建Topic

    2. 创建 Topic 对话框,设置名称、分区数、消息保留时长等,然后单击确定
      图片

      配置

      说明

      Topic 名称

      输入 Topic 名称。
      只能由小写英文字符、数字、下划线(_)和短横线(-)组成,长度为 3~64 个字符。

      描述

      填写 Topic 的描述语言。

      分区数

      输入分区数。
      默认为 12,输入框下展示剩余可用分区数。

      消息保留时长

      数据在 Topic 中的保留时长。

      • 默认与资源池设置的全局消息保留时长保持一致,但也可按小时粒度自主调节。
      • 支持设置的留存范围为 1~336 小时(14 天)。
  6. 创建 Consumer Group。
    1. 在资源池详情左侧导航栏选择 Consumer Group,然后单击创建 Group

    2. 创建 Group对话框,设置 Group 名称和描述,然后单击确定
      图片

      配置

      说明

      Group 名称

      自定义设置 Group 名称。

      • 由小写英文字母、数字、短横线(-)和下划线(_)构成。
      • 长度为 3~64 个字符。

      描述

      输入描述信息。非必填。

步骤二:准备数据目的 TOS Bucket

  1. 登录对象存储控制台

  2. 在左侧导航栏单击桶列表,然后单击创建桶

  3. 创建存储桶页面,设置存储桶名称、区域和桶策略等关键参数,然后单击确定
    图片
    此处仅介绍创建存储桶时的必填参数,如需了解更多信息,请参见创建存储桶

    配置

    说明

    名称

    自定义设置存储桶的名称。

    • 只能由数字、短横线(-)、小写英文字母组成,开头和结尾只能是数字或字母。
    • 长度为 3~63 字符。

    区域

    与 BMQ 资源池、Flink 资源池保持在同一个地域。

    桶策略

    设置存储桶的桶策略(Bucket Policy),此处选择私有策略。
    策略说明如下:

    • 私有:默认值,只有该桶的归属者及有授权的账号对桶中的对象有读写权限,推荐使用。
    • 公共读:任何人均可对公共读权限的存储桶进行读取操作,同时产生流量费用。
    • 公共读写:任何人均可对公共读写权限的存储桶进行读写操作,同时产生流量费用。
  4. 在存储桶的文件列表页面,单击创建文件夹,然后设置文件夹名称并单击确定
    对象存储 TOS 以扁平化结构存放文件,为方便分类管理,您可以创建文件夹。

  1. 登录流式计算 Flink 版控制台
  2. 在顶部菜单栏选择目标地域,然后从项目管理页面进入项目。
  3. 创建资源池。
    1. 在项目左侧导航栏选择资源管理,然后单击资源池页签,再单击创建资源池

    2. 创建通用资源池页面,设置资源池基本信息、网络信息、存储信息等参数,然后单击下一步:确认订单
      图片

      一级配置项

      二级配置项

      说明

      基本信息

      资源类型

      默认为通用资源

      资源模式

      默认为 VCI 模式,即在通用资源-VCI 资源域上部署 Flink 资源池。

      计费类型

      在通用资源-VCI 资源域部署 Flink 资源池支持按量计费包年包月计费类型,请按需选择。
      如需了解计费详情,请参见按量计费包年包月
      如果选择包年包月计费类型,还需要选择购买时长,以及确认是否需要自动续费

      资源池名称

      输入要创建的资源池名称。

      • 全局唯一且不能为空字符串。
      • 长度为 1~63 个字符。
      • 支持英文小写字母、数字和短横线(-);且名称开头和结尾必须是字母或数字。

      地域及可用区

      • 地域已选定,不可更改。与 BMQ 资源池、TOS Bucket 保持在同一个地域。
      • 根据业务的网络延迟、高可用等需求,选择一个合适的可用区。

      所属项目

      从下拉列表中选择资源池所属项目。

      资源配置

      计算规格

      如果创建包年包月计费类型,则需要为 Flink 资源池手动配置资源,资源的基础单位为 CU,1 CU 的含义为:CPU 1 核、内存 4 GB。
      请根据实际需求设置资源量。

      网络信息

      私有网络

      从下拉列表中选择私有网络。 需要与 BMQ 资源池保持在相同 VPC 内。

      子网

      从下拉列表中选择子网。
      系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。

      安全组

      从下拉列表中选择安全组。

      存储信息

      TOS 对象存储

      默认为产品初始化时关联的 TOS,不支持修改。

    3. 详情页面,查看资源池配置详情,阅读并勾选 Flink 相关协议,然后单击立即购买
      您可以返回资源池列表页面,查看创建进度。创建完成后显示为运行中

  4. 开发任务。
    1. 在项目左侧导航栏选择任务开发 > Jupyter lab,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink Stream SQL 区块。
      图片

    2. 创建任务对话框,设置任务名称、类型、文件夹和引擎版本,然后单击确定
      图片

      配置

      说明

      任务名称

      自定义设置任务的名称,如“datagen-bmq-tos”。
      名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

      任务类型

      选择 Flink 任务 > Flink Stream > SQL

      所在文件夹

      系统提供文件夹管理功能,用于分类管理任务。您可以直接选择系统默认存在的数据开发文件夹
      如果您有自建文件夹管理任务的要求,请单击创建文件夹的文件夹按钮,然后创建文件夹。

      引擎版本

      目前支持 Flink 1.11-volcanoFlink 1.16-volcano 版本。

      注意

      如果您通过 Kafka 连接器连接 BMQ 资源,且使用 Flink 1.16-volcano 引擎版本,那么必须将properties.enable.idempotence参数设置为 false 以关闭幂等,否则任务会运行失败。

      任务描述

      输入任务的描述语句,一般描述任务实现的功能。

    3. 在任务编辑区编写 SQL 任务的业务逻辑代码。
      此处您可以直接拷贝并使用以下代码。代码实现将 datagen 连接器实时生成的随机数先写入 BMQ Topic 中;然后读取 BMQ Topic 数据并输出到 TOS Bucket。

      注意

      • 通过 Kafka 连接器往 BMQ Topic 中读写数据时,如果使用 Flink 1.11 引擎版本,则可以直接使用以下 Demo。如果使用 Flink 1.16 引擎版本时,需要额外添加配置('properties.enable.idempotence'='false'),用来关闭幂等,否则任务将会运行失败。
      • Flink 任务中通过 Kafka 连接器实现往 BMQ Topic 中写入数据。写入数据时如果出现吞吐量不足,您可以通过设置 properties.batch_sizeproperties.linger_ms 参数提升吞吐量。Kafka 相关参数,请参见Kafka/BMQ
      • 往 TOS 写入文件时,使用 filesystem 连接器。如果需要尽快在 TOS Bucket 中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。
        Filesystem 连接器参数,请参见Filesystem;如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy
      • 一个任务中,如果存在一个表同时作为 source 和 sink,调试的时候会报错Table:xxx should not be both source and sink.。建议您直接验证 SQL 正确性,确保无误后可直接上线。
      create table orders (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time as localtimestamp
      ) WITH (
       'connector' = 'datagen',
       'rows-per-second'='1',
       'fields.order_status.length' = '3',
       'fields.order_id.min' = '1',
       'fields.order_id.max' = '10000',
       'fields.order_product_id.min' = '1',
       'fields.order_product_id.max' = '100',
       'fields.order_customer_id.min' = '1',
       'fields.order_customer_id.max' = '1000'
      );
      
      create table bmq_table (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time timestamp
      ) WITH (
        'connector' = 'kafka',   --访问 BMQ 资源时,使用 kafka 连接器。
        'topic' = 'topic-b',     --Topic 名称。
        'properties.bootstrap.servers' = 'bmq***bq5-1.openstudio.ivolces.com:9092,bmq***bq5-2.openstudio.ivolces.com:9092,bmq***bq5-3.openstudio.ivolces.com:9092',  --BMQ 资源池的用户接入点地址,三个地之间用英文逗号分隔。
        'properties.group.id' = 'group-b',  --BMQ的Consumer Group名称,需要提前创建,否则不能正常提交Offset。
        'scan.startup.mode' = 'earliest-offset',  
        'scan.topic-partition-discovery.interval' = '120s',   --定期扫描并发现新的Topic和Partition的时间间隔。
        'format' = 'json'
      );
      
      insert into bmq_table 
      select * from orders;
      
      CREATE TABLE tos_sink (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time timestamp
      ) WITH (
        'connector' = 'filesystem',   --访问 TOS 资源时使用 filesystem 连接器。
        'path' = 'tos://doc-**test/fl**le/',  --tos 路径,由 Bucket 名称和文件夹名称组成。
        'sink.rolling-policy.file-size' = '1M',  --文件内存最大限制,达到该值关闭文件并新打开一个文件写入。
        'sink.rolling-policy.rollover-interval' = '5 min',  --文件持续写入时间,达到该值关闭文件并打开一个新文件写入。
        'format' = 'json'
      );
      
      insert into tos_sink
      select * from bmq_table;
      
    4. 单击格式化按钮,系统自动调整SQL代码格式。
      系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。

    5. 在代码编辑区上方,单击验证按钮。
      系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
      图片

    6. 在代码编辑区上方,单击参数配置,然后开启 Checkpoint。
      图片

  5. 启动任务。
    1. 任务开发栏目下查找并单击目标任务,然后在编辑区上方选择正确的执行方式引擎版本,再单击上线

      说明

      Flink Stream 类型任务选择执行方式为 STREAMING;Flink Batch 类型任务选择执行方式为 BATCH

    2. 任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定
      系统会提示任务上线成功,可以前往任务管理页面查看。
      图片

      配置

      说明

      运行资源池

      从下拉列表中选择任务运行的 Flink 资源池。

      任务优先级

      系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。
      任务优先级决定了任务内部的调度顺序,优先级高的任务先被调度,即 L3 先于 L4 被调度。

      调度策略

      根据需求配置任务调度策略:

      • GANG:保证任务的所有实例被一起调度,即当剩余资源满足任务正常运行所需资源时才进行分配;不满足所需资源则不分配。
        该策略不会出现分配资源后,任务却不能启动的现象,解决了资源死锁问题。
      • DRF:从多维资源考虑,更为合理地将资源公平分配给资源池内的各个任务,从而提升利用率。
        例如:剩余 10 核 40 GB 的资源,A 任务需要10 核 20 GB 资源;B 任务需要 2 核 8 GB 的资源。如果分配给 A,剩余 0 核 20 GB 资源无法被利用;DRF 策略会选择分配给 B,剩下 8 核 32 GB 可以继续给后来任务使用。

      调度时长

      设置为 GANG 调度策略时,需要设置调度时长。
      如果超过调度时长,任务就会调度失败。如果设置为 0,则会一直重试。

    3. 在项目左侧导航栏选择任务运维 > 任务管理,然后单击目标任务后方的启动按钮。

    4. 启动任务对话框,选择任务启动方式,然后单击确定
      任务启动需要一定时长,请耐心等待。启动成功后,状态为运行中
      图片

      配置

      说明

      启动方式

      请根据实际情况选择任务启动方式:

      • 从最新状态启动:以最新的 Checkpoint 或 Savepoint 启动。
      • 全新启动:不使用 Checkpoint 或 Savepoint,直接启动。
      • 指定快照启动:指定目标快照(Savepoint)启动。

      说明

      首次上线的任务,只能是全新启动方式。

      参数配置

      任务携带在开发侧的并行度、Task Manager 和 Job Manager 的资源配置。在启动任务时支持您更新配置并快速生效。

      说明

      更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。

      • 并行度:任务全局并发数。
      • 单个 TaskManager CPU 数:单个 TaskManager 的 CPU 核数。
      • 单个 TaskManager 内存大小:单个 TaskManager 占用的内存大小。
      • 单个 TaskManager slot 数:单个 TaskManager 的 Slot 数量。
      • JobManager CPU 数:单个 JobManager 的 CPU 核数。
      • JobManager 内存大小:单个 JobManager 占用的内存大小。

      更多设置

      在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。
      图片

      注意

      • 仅当选择指定快照启动从最新状态启动方式时,支持勾选忽略部分算子状态。
      • 当您选择全新启动方式时,不支持忽略算子状态。

结果验证

本文的 Flink SQL 任务代码实现的是将 datagen 连接器实时生成的随机数先写入 BMQ Topic 中;然后读取 BMQ Topic 数据并输出到 TOS Bucket。
因此,可以在 BMQ Topic 侧预览数据,以验证随机数据是否正常写入;也可以在 TOS Bucket 中查看是否写入文件。

预览 BMQ Topic 数据

  1. 登录云原生消息引擎控制台

  2. 在顶部菜单栏选择目标地域。

  3. 项目管理页面,通过项目名称模糊搜索目标项目,然后单击项目区块进入项目。

  4. 在左侧导航栏选择资源管理,然后单击目标资源池名称,进入资源池详情页面。

  5. 在资源池详情的左侧导航栏选择 Topic,然后单击目标 Topic 后方的数据预览
    图片

  6. 数据预览的对话框,选择数据预览的方式,填写相关信息,然后单击立即预览
    图片

    参数

    说明

    数据类型

    目前仅支持 Str 类型。

    数据预览

    选择数据预览的方式。

    • 根据时间点:根据生产消息的时间点来预览信息。
    • 根据位点:根据消费位点来预览信息。

    分区

    选择存储消息的分区,两种预览方式都需要设置该参数。

    时间点

    选择根据时间点预览方式时,需要设置具体的时间点。

    相对偏移量

    选择根据位点预览方式时,需要设置相对偏移量。

  7. 查看数据预览结果,然后单击关闭
    图片

查看 TOS Bucket 文件

  1. 登录对象存储控制台
  2. 在左侧导航栏选择桶列表,然后单击目标桶名称。
  3. 文件列表页面,单击目标文件夹名称,查看写入的文件详情。
    图片