本文介绍通过一个简单的 Flink SQL 任务,实现从 BMQ Topic 中读取实时数据,然后写入 TOS 中。
为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 BMQ 资源池和 Flink 资源池均处于相同地域的同一个 VPC 内。
您可以在创建云资源前,先创建私有网络。相关文档,请参见创建私有网络和创建子网。
在项目左侧导航栏选择资源管理,然后单击创建资源池。
在创建资源池页面,设置资源池的基本信息、资源配置、网络信息、Topic 配置等关键参数,然后单击下一步:确认订单。
一级配置项 | 二级配置项 | 说明 |
---|---|---|
基本信息 | 资源类型 | 默认为通用资源。 |
资源模式 | 默认为 VCI 模式,即在通用资源-VCI 资源域上创建 BMQ 资源池。 | |
计费类型 | 选择资源池计费的类型。目前仅支持按量计费。 | |
资源池名称 | 输入资源池名称。
| |
地域及可用区部署 | 地域已选定,不可更改。
| |
所属项目 | 从下拉列表中选择资源池所属项目。 | |
资源配置 | 计算规格 | 根据业务场景预估需要的 Topic 数量、Consumer Group 数量、分区数量等,选择适合的资源池规格。 |
存储规格 | 默认使用 CloudFS 加速存储,无需额外配置。 | |
网络信息 | 私有网络 | 为保证内网顺利访问,建议选择已有云上业务的地域位置所在的 VPC。同一个 VPC 内,不同可用区子网之间是互通的。 |
子网 | 从下拉列表中选择子网。系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。 说明 如果是多可用区部署的资源池,需要为选择的所有可用区分别配置子网。 | |
安全组 | 从下拉列表中选择安全组。 | |
Topic 配置 | 消息保留时长 | 为该资源池下的所有 Topic 设置默认消息保留时长。
|
在订单详情页面,确认资源池配置信息,然后阅读并勾选产品相关协议,再单击立即购买。
提交购买订单后,您可以返回资源池页面。购买的资源池显示为初始化中,初始化完成后显示为运行中。
在资源池详情左侧导航栏选择 Topic,然后单击创建Topic。
在创建 Topic 对话框,设置名称、分区数、消息保留时长等,然后单击确定。
配置 | 说明 |
---|---|
Topic 名称 | 输入 Topic 名称。 |
描述 | 填写 Topic 的描述语言。 |
分区数 | 输入分区数。 |
消息保留时长 | 数据在 Topic 中的保留时长。
|
在资源池详情左侧导航栏选择 Consumer Group,然后单击创建 Group。
在创建 Group对话框,设置 Group 名称和描述,然后单击确定。
配置 | 说明 |
---|---|
Group 名称 | 自定义设置 Group 名称。
|
描述 | 输入描述信息。非必填。 |
登录对象存储控制台。
在左侧导航栏单击桶列表,然后单击创建桶。
在创建存储桶页面,设置存储桶名称、区域和桶策略等关键参数,然后单击确定。
此处仅介绍创建存储桶时的必填参数,如需了解更多信息,请参见创建存储桶。
配置 | 说明 |
---|---|
名称 | 自定义设置存储桶的名称。
|
区域 | 与 BMQ 资源池、Flink 资源池保持在同一个地域。 |
桶策略 | 设置存储桶的桶策略(Bucket Policy),此处选择私有策略。
|
在存储桶的文件列表页面,单击创建文件夹,然后设置文件夹名称并单击确定。
对象存储 TOS 以扁平化结构存放文件,为方便分类管理,您可以创建文件夹。
在项目左侧导航栏选择资源管理,然后单击资源池页签,再单击创建资源池。
在创建通用资源池页面,设置资源池基本信息、网络信息、存储信息等参数,然后单击下一步:确认订单。
一级配置项 | 二级配置项 | 说明 |
---|---|---|
基本信息 | 资源类型 | 默认为通用资源。 |
资源模式 | 默认为 VCI 模式,即在通用资源-VCI 资源域上部署 Flink 资源池。 | |
计费类型 | 在通用资源-VCI 资源域部署 Flink 资源池支持按量计费和包年包月计费类型,请按需选择。 | |
资源池名称 | 输入要创建的资源池名称。
| |
地域及可用区 |
| |
所属项目 | 从下拉列表中选择资源池所属项目。 | |
资源配置 | 计算规格 | 如果创建包年包月计费类型,则需要为 Flink 资源池手动配置资源,资源的基础单位为 CU,1 CU 的含义为:CPU 1 核、内存 4 GB。 |
网络信息 | 私有网络 | 从下拉列表中选择私有网络。 需要与 BMQ 资源池保持在相同 VPC 内。 |
子网 | 从下拉列表中选择子网。 | |
安全组 | 从下拉列表中选择安全组。 | |
存储信息 | TOS 对象存储 | 默认为产品初始化时关联的 TOS,不支持修改。 |
在详情页面,查看资源池配置详情,阅读并勾选 Flink 相关协议,然后单击立即购买。
您可以返回资源池列表页面,查看创建进度。创建完成后显示为运行中。
在项目左侧导航栏选择任务开发 > Jupyter lab,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink Stream SQL 区块。
在创建任务对话框,设置任务名称、类型、文件夹和引擎版本,然后单击确定。
配置 | 说明 |
---|---|
任务名称 | 自定义设置任务的名称,如“datagen-bmq-tos”。 |
任务类型 | 选择 Flink 任务 > Flink Stream > SQL。 |
所在文件夹 | 系统提供文件夹管理功能,用于分类管理任务。您可以直接选择系统默认存在的数据开发文件夹。 |
引擎版本 | 目前支持 Flink 1.11-volcano 和 Flink 1.16-volcano 版本。 注意 如果您通过 Kafka 连接器连接 BMQ 资源,且使用 Flink 1.16-volcano 引擎版本,那么必须将 |
任务描述 | 输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编写 SQL 任务的业务逻辑代码。
此处您可以直接拷贝并使用以下代码。代码实现将 datagen 连接器实时生成的随机数先写入 BMQ Topic 中;然后读取 BMQ Topic 数据并输出到 TOS Bucket。
注意
'properties.enable.idempotence'
=
'false'
),用来关闭幂等,否则任务将会运行失败。Table:xxx should not be both source and sink.
。建议您直接验证 SQL 正确性,确保无误后可直接上线。create table orders ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '10000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); create table bmq_table ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'kafka', --访问 BMQ 资源时,使用 kafka 连接器。 'topic' = 'topic-b', --Topic 名称。 'properties.bootstrap.servers' = 'bmq***bq5-1.openstudio.ivolces.com:9092,bmq***bq5-2.openstudio.ivolces.com:9092,bmq***bq5-3.openstudio.ivolces.com:9092', --BMQ 资源池的用户接入点地址,三个地之间用英文逗号分隔。 'properties.group.id' = 'group-b', --BMQ的Consumer Group名称,需要提前创建,否则不能正常提交Offset。 'scan.startup.mode' = 'earliest-offset', 'scan.topic-partition-discovery.interval' = '120s', --定期扫描并发现新的Topic和Partition的时间间隔。 'format' = 'json' ); insert into bmq_table select * from orders; CREATE TABLE tos_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'filesystem', --访问 TOS 资源时使用 filesystem 连接器。 'path' = 'tos://doc-**test/fl**le/', --tos 路径,由 Bucket 名称和文件夹名称组成。 'sink.rolling-policy.file-size' = '1M', --文件内存最大限制,达到该值关闭文件并新打开一个文件写入。 'sink.rolling-policy.rollover-interval' = '5 min', --文件持续写入时间,达到该值关闭文件并打开一个新文件写入。 'format' = 'json' ); insert into tos_sink select * from bmq_table;
单击格式化按钮,系统自动调整SQL代码格式。
系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。
在代码编辑区上方,单击验证按钮。
系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
。
在代码编辑区上方,单击参数配置,然后开启 Checkpoint。
在任务开发栏目下查找并单击目标任务,然后在编辑区上方选择正确的执行方式和引擎版本,再单击上线。
说明
Flink Stream 类型任务选择执行方式为 STREAMING;Flink Batch 类型任务选择执行方式为 BATCH。
在任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定。
系统会提示任务上线成功,可以前往任务管理页面查看。
配置 | 说明 |
---|---|
运行资源池 | 从下拉列表中选择任务运行的 Flink 资源池。 |
任务优先级 | 系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。 |
调度策略 | 根据需求配置任务调度策略:
|
调度时长 | 设置为 GANG 调度策略时,需要设置调度时长。 |
在项目左侧导航栏选择任务运维 > 任务管理,然后单击目标任务后方的启动按钮。
在启动任务对话框,选择任务启动方式,然后单击确定。
任务启动需要一定时长,请耐心等待。启动成功后,状态为运行中。
配置 | 说明 |
---|---|
启动方式 | 请根据实际情况选择任务启动方式:
说明 首次上线的任务,只能是全新启动方式。 |
参数配置 | 任务携带在开发侧的并行度、Task Manager 和 Job Manager 的资源配置。在启动任务时支持您更新配置并快速生效。 说明 更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。
|
更多设置 | 在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。 注意
|
本文的 Flink SQL 任务代码实现的是将 datagen 连接器实时生成的随机数先写入 BMQ Topic 中;然后读取 BMQ Topic 数据并输出到 TOS Bucket。
因此,可以在 BMQ Topic 侧预览数据,以验证随机数据是否正常写入;也可以在 TOS Bucket 中查看是否写入文件。
登录云原生消息引擎控制台。
在顶部菜单栏选择目标地域。
在项目管理页面,通过项目名称模糊搜索目标项目,然后单击项目区块进入项目。
在左侧导航栏选择资源管理,然后单击目标资源池名称,进入资源池详情页面。
在资源池详情的左侧导航栏选择 Topic,然后单击目标 Topic 后方的数据预览。
在数据预览的对话框,选择数据预览的方式,填写相关信息,然后单击立即预览。
参数 | 说明 |
---|---|
数据类型 | 目前仅支持 Str 类型。 |
数据预览 | 选择数据预览的方式。
|
分区 | 选择存储消息的分区,两种预览方式都需要设置该参数。 |
时间点 | 选择根据时间点预览方式时,需要设置具体的时间点。 |
相对偏移量 | 选择根据位点预览方式时,需要设置相对偏移量。 |
查看数据预览结果,然后单击关闭。