本文介绍如何通过一个简单的 Flink SQL 任务,实现从 BMQ Topic 中读取实时数据,然后写入 ESCloud Index 中。
为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 BMQ 资源池、云搜索服务实例和 Flink 资源池均处于相同地域的同一个 VPC 内。
您可以在创建云资源前,先创建私有网络。相关文档,请参见创建私有网络和创建子网。
登录云原生消息引擎控制台。
在顶部菜单栏选择目标地域,然后从项目管理页面进入目标项目。
创建资源池。
在项目左侧导航栏选择资源管理,然后单击创建资源池。
在创建资源池页面,设置资源池的基本信息、资源配置、网络信息、Topic 配置等关键参数,然后单击下一步:确认订单。
一级配置项 | 二级配置项 | 说明 |
---|---|---|
基本信息 | 资源类型 | 默认为通用资源。 |
资源模式 | 默认为 VCI 模式,即在通用资源-VCI 资源域上创建 BMQ 资源池。 | |
计费类型 | 选择资源池计费的类型。目前仅支持按量计费。 | |
资源池名称 | 输入资源池名称。
| |
地域及可用区部署 | 地域已选定,不可更改。
| |
所属项目 | 从下拉列表中选择资源池所属项目。 | |
资源配置 | 计算规格 | 根据业务场景预估需要的 Topic 数量、Consumer Group 数量、分区数量等,选择适合的资源池规格。 |
存储规格 | 默认使用 CloudFS 加速存储,无需额外配置。 | |
网络信息 | 私有网络 | 为保证内网顺利访问,建议选择已有云上业务的地域位置所在的 VPC。同一个 VPC 内,不同可用区子网之间是互通的。 |
子网 | 从下拉列表中选择子网。系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。 说明 如果是多可用区部署的资源池,需要为选择的所有可用区分别配置子网。 | |
安全组 | 从下拉列表中选择安全组。 | |
Topic 配置 | 消息保留时长 | 为该资源池下的所有 Topic 设置默认消息保留时长。
|
在订单详情页面,确认资源池配置信息,然后阅读并勾选产品相关协议,再单击立即购买。
提交购买订单后,您可以返回资源池页面。购买的资源池显示为初始化中,初始化完成后显示为运行中。
获取接入点地址。
创建 Topic。
在资源池详情左侧导航栏选择 Topic,然后单击创建Topic。
在创建 Topic 对话框,设置名称、分区数、消息保留时长等,然后单击确定。
配置 | 说明 |
---|---|
Topic 名称 | 输入 Topic 名称。 |
描述 | 填写 Topic 的描述语言。 |
分区数 | 输入分区数。 |
消息保留时长 | 数据在 Topic 中的保留时长。
|
创建 Consumer Group。
在资源池详情左侧导航栏选择 Consumer Group,然后单击创建 Group。
在创建 Group对话框,设置 Group 名称和描述,然后单击确定。
配置 | 说明 |
---|---|
Group 名称 | 自定义设置 Group 名称。
|
描述 | 输入描述信息。非必填。 |
在实例列表页面,单击创建实例。
在创建实例页面,设置实例名称,可用区、版本、节点详情和访问方式等参数,然后单击确认订单。
配置 | 说明 |
---|---|
实例名称 | 自定义实例名称,非必填项。如果不设置实例名称,默认生成与实例ID相同的名称。
|
可用区 | 选择需要创建实例的可用区。 |
版本 | 兼容 6.7.1 及 7.10.2 Elasticsearch 版本。 |
部署节点类型 | 支持以下节点类型:
|
节点详情 | 配置数据节点或专有主节点规格。
说明
|
访问方式 | 支持售卖区访问、存储内网访问、公共服务区访问三种方式 。
|
私有网络 | 当选择售卖区访问云搜索服务实例时,需要选择特定 VPC,才能访问实例。为保证内网顺利访问,此处需要选择和已创建的 BMQ 资源池所在的 VPC 保持一致。 说明
|
子网 | 系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。 |
所属项目 | 选择云搜索实例所属的项目,便于管理资源,非必选项。 |
用户名 | 默认为 admin,不能修改。用于 Kibana 页面登录,以及实例登录认证。 |
登录密码 | admin 账号对应的密码,请按照规则设置,并妥善保存。
|
确认密码 | 输入上一步输入的密码。 |
传输协议 | 选择传输协议为 HTTP 或者 HTTPS。 注意 在 Flink 任务中,目前暂时不支持 HTTPS 传输协议的实例,此处选择为 HTTP。如果在 Flink 任务中配置 ES 的 HTTPS 访问地址,任务将运行失败。 |
删除保护 | 选择是否启用删除保护,默认不启用。 |
计费类型 | 选择实例的计费类型,支持按量计费和包年包月。关于计费的详细信息,请参见计费概述。 |
在确认订单页面,查看实例配置详情,阅读并勾选相关产品协议,然后单击立即购买。
实例购买成功后,您可以单击去控制台,页面会自动跳转至实例列表页,方便您查看刚创建的实例。实例从创建中变为运行中,则表示实例创建成功。
在项目左侧导航栏选择资源管理,然后单击资源池页签,再单击创建资源池。
在创建通用资源池页面,设置资源池基本信息、网络信息、存储信息等参数,然后单击下一步:确认订单。
一级配置项 | 二级配置项 | 说明 |
---|---|---|
基本信息 | 资源类型 | 默认为通用资源。 |
资源模式 | 默认为 VCI 模式,即在通用资源-VCI 资源域上部署 Flink 资源池。 | |
计费类型 | 在通用资源-VCI 资源域部署 Flink 资源池支持按量计费和包年包月计费类型,请按需选择。 | |
资源池名称 | 输入要创建的资源池名称。
| |
地域及可用区 |
| |
所属项目 | 从下拉列表中选择资源池所属项目。 | |
资源配置 | 计算规格 | 如果创建包年包月计费类型,则需要为 Flink 资源池手动配置资源,资源的基础单位为 CU,1 CU 的含义为:CPU 1 核、内存 4 GB。 |
网络信息 | 私有网络 | 从下拉列表中选择私有网络。 需要与 BMQ 资源池、云搜索服务实例保持在相同 VPC 内。 |
子网 | 从下拉列表中选择子网。 | |
安全组 | 从下拉列表中选择安全组。 | |
存储信息 | TOS 对象存储 | 默认为产品初始化时关联的 TOS,不支持修改。 |
在详情页面,查看资源池配置详情,阅读并勾选 Flink 相关协议,然后单击立即购买。
您可以返回资源池列表页面,查看创建进度。创建完成后显示为运行中。
在项目左侧导航栏选择任务开发 > Jupyter lab,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink Stream SQL 区块。
在创建任务对话框,设置任务名称、类型、文件夹和引擎版本,然后单击确定。
配置 | 说明 |
---|---|
任务名称 | 自定义设置任务的名称。 |
任务类型 | 选择 Flink 任务 > Flink Stream > SQL。 |
所在文件夹 | 系统提供文件夹管理功能,用于分类管理任务。您可以直接选择系统默认存在的数据开发文件夹。 |
引擎版本 | 目前支持 Flink 1.11-volcano 和 Flink 1.16-volcano 版本。 注意 如果您通过 Kafka 连接器连接 BMQ 资源,且使用 Flink 1.16-volcano 引擎版本,那么必须将 |
任务描述 | 输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编写 SQL 任务的业务逻辑代码。
此处您可以直接拷贝并使用以下代码。代码实现将 datagen 连接器实时生成的随机数先写入 BMQ Topic 中;然后读取 BMQ Topic 数据并输出到 ESCloud Index 中。
注意
'properties.enable.idempotence'
=
'false'
),用来关闭幂等,否则任务将会运行失败。'document-type'
=
'test'
),用来指明文档类型。create table orders_datagen ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '10000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '1000', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); create table bmq_table ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'topic-a', --Topic名称。 'properties.bootstrap.servers' = 'bmq-****q5-1.openstudio.ivolces.com:9092', --资源池接入点地址。 'properties.group.id' = 'group-a', -- BMQ的Consumer Group名称。需要提前创建,否则不能正常提交Offset。 'scan.startup.mode' = 'earliest-offset', 'scan.topic-partition-discovery.interval' = '120s', --定期扫描并发现新的Topic和Partition的时间间隔。 'format' = 'json' ); insert into bmq_table select * from orders_datagen; create table escloud_table ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'elasticsearch-7', --连接器版本,需要与实例版本保持相同,以免出现不兼容问题。 'hosts' = 'http://elasticsearch-pt8y***g3mi.escloud.ivolces.com:9200', --实例访问地址。Flink 任务中暂不支持 HTTPS 传输协议的ES实例。 'index' = 'test_orders', --Index名称。无需提前创建索引,实例将会自动创建。 'username' = 'admin', --实例访问用户名称。 'password'='cd****456' --用户密码。 ); insert into escloud_table select * from bmq_table;
单击格式化按钮,系统自动调整SQL代码格式。
系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。
在代码编辑区上方,单击验证按钮。
系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
。
在任务编辑区,单击上方的上线按钮。
在任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定。
系统会提示任务上线成功,可以前往任务管理页面查看。
配置 | 说明 |
---|---|
运行资源池 | 从下拉列表中选择任务运行的 Flink 资源池。 |
任务优先级 | 系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。 |
调度策略 | 根据需求配置任务调度策略:
|
调度时长 | 设置为 GANG 调度策略时,需要设置调度时长。 |
在项目左侧导航栏选择任务运维 > 任务管理,然后单击目标任务后方的启动按钮。
在启动任务对话框,选择任务启动方式,然后单击确定。
任务启动需要一定时长,请耐心等待。启动成功后,状态为运行中。
配置 | 说明 |
---|---|
启动方式 | 请根据实际情况选择任务启动方式:
说明 首次上线的任务,只能是全新启动方式。 |
参数配置 | 任务携带在开发侧的并行度、Task Manager 和 Job Manager 的资源配置。在启动任务时支持您更新配置并快速生效。 说明 更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。
|
更多设置 | 在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。 注意
|
您可以登录 Kibana Web 页面,查询索引(test_orders
)中是否被成功写入随机生成的数据。
登录云搜索服务控制台。
在顶部导航栏选择目标地域。
在实例列表页面,选择目标实例,然后单击该实例后方的 Kibana。
在 Kibana 登录页面,输入用户名和密码,单击 Log In,登录 Kibana。
说明
用户名为 admin,密码为创建实例时设置的密码。
在 Kibana Web 页面左侧导航栏选择 Dev Tools,然后在 Console 左侧代码区域输入GET /test_orders/_search
,再单击执行按钮。
当返回如下类似信息时,表示已成功从 BMQ Topic 中读取数据并写入到 ESCloud Index。
{ "took" : 1125, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2546, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "test_orders", "_type" : "_doc", "_id" : "sBiE2o****WUSExIw", "_score" : 1.0, "_source" : { "order_id" : 1967, "order_product_id" : 293, "order_customer_id" : 493, "order_status" : "3b5", "order_update_time" : "2023-03-13 18:21:44.248" } }, ......