You need to enable JavaScript to run this app.
导航
读取云原生消息引擎 BMQ 数据写入云搜索服务 Cloud Search
最近更新时间:2024.04.07 19:15:43首次发布时间:2023.12.26 14:21:00

本文介绍如何通过一个简单的 Flink SQL 任务,实现从 BMQ Topic 中读取实时数据,然后写入 ESCloud Index 中。

流程介绍

图片

  1. 准备数据源 BMQ Topic。
    您需要在云原生消息引擎控制台创建资源池、Topic 和 Consumer Group,并获取资源池接入点地址。
  2. 准备数据目的 ESCloud Index。
    您需要在云搜索服务控制台购买实例并获取实例的访问地址。无需手动新建 Index,系统的动态映射能力会自动创建索引。
  3. 开发 Flink SQL 任务。
    当您准备好数据源和数据目的后,便可以在流式计算 Flink 控制台开发 SQL 任务,通过简单的代码逻辑实现从 BMQ 读取数据写入到 ESCloud 的目的。

前提条件

为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 BMQ 资源池、云搜索服务实例和 Flink 资源池均处于相同地域的同一个 VPC 内。
您可以在创建云资源前,先创建私有网络。相关文档,请参见创建私有网络创建子网

步骤一:准备数据源 BMQ Topic

  1. 登录云原生消息引擎控制台

  2. 在顶部菜单栏选择目标地域,然后从项目管理页面进入目标项目。

  3. 创建资源池。

    1. 在项目左侧导航栏选择资源管理,然后单击创建资源池

    2. 创建资源池页面,设置资源池的基本信息、资源配置、网络信息、Topic 配置等关键参数,然后单击下一步:确认订单
      图片

      一级配置项

      二级配置项

      说明

      基本信息

      资源类型

      默认为通用资源

      资源模式

      默认为 VCI 模式,即在通用资源-VCI 资源域上创建 BMQ 资源池。

      计费类型

      选择资源池计费的类型。目前仅支持按量计费

      资源池名称

      输入资源池名称。

      • 由小写字母、数字和短横线(-)构成。
      • 长度为 1~64 个字符。

      地域及可用区部署

      地域已选定,不可更改。
      根据业务的网络延迟、高可用容灾等需求,选择单可用区或者多可用区

      • 单可用区:选择一个合适的可用区即可。
      • 多可用区:默认选中多个可用区。

      所属项目

      从下拉列表中选择资源池所属项目。

      资源配置

      计算规格

      根据业务场景预估需要的 Topic 数量、Consumer Group 数量、分区数量等,选择适合的资源池规格。

      存储规格

      默认使用 CloudFS 加速存储,无需额外配置。

      网络信息

      私有网络

      为保证内网顺利访问,建议选择已有云上业务的地域位置所在的 VPC。同一个 VPC 内,不同可用区子网之间是互通的。
      如果还未创建私有网络,请参见创建私有网络

      子网

      从下拉列表中选择子网。系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。

      说明

      如果是多可用区部署的资源池,需要为选择的所有可用区分别配置子网。

      安全组

      从下拉列表中选择安全组。

      Topic 配置

      消息保留时长

      为该资源池下的所有 Topic 设置默认消息保留时长。

      • 默认为 72 小时,可按小时粒度调节。
      • 支持设置的留存范围为 1~336 小时(14天)。
    3. 订单详情页面,确认资源池配置信息,然后阅读并勾选产品相关协议,再单击立即购买
      提交购买订单后,您可以返回资源池页面。购买的资源池显示为初始化中,初始化完成后显示为运行中

  4. 获取接入点地址。

    1. 资源池管理页面,单击资源池名称,进入资源池详情页面。
    2. 在资源池详情页面的服务访问页签下,查看并复制资源池的用户接入点地址。
      图片
  5. 创建 Topic。

    1. 在资源池详情左侧导航栏选择 Topic,然后单击创建Topic

    2. 创建 Topic 对话框,设置名称、分区数、消息保留时长等,然后单击确定
      图片

      配置

      说明

      Topic 名称

      输入 Topic 名称。
      只能由小写英文字符、数字、下划线(_)和短横线(-)组成,长度为 3~64 个字符。

      描述

      填写 Topic 的描述语言。

      分区数

      输入分区数。
      默认为 12,输入框下展示剩余可用分区数。

      消息保留时长

      数据在 Topic 中的保留时长。

      • 默认与资源池设置的全局消息保留时长保持一致,但也可按小时粒度自主调节。
      • 支持设置的留存范围为 1~336 小时(14 天)。
  6. 创建 Consumer Group。

    1. 在资源池详情左侧导航栏选择 Consumer Group,然后单击创建 Group

    2. 创建 Group对话框,设置 Group 名称和描述,然后单击确定
      图片

      配置

      说明

      Group 名称

      自定义设置 Group 名称。

      • 由小写英文字母、数字、短横线(-)和下划线(_)构成。
      • 长度为 3~64 个字符。

      描述

      输入描述信息。非必填。

步骤二:准备数据目的 ESCloud Index

  1. 登录云搜索服务控制台
  2. 在顶部导航栏选择目标地域。
  3. 创建实例。
    1. 实例列表页面,单击创建实例

    2. 创建实例页面,设置实例名称,可用区、版本、节点详情和访问方式等参数,然后单击确认订单
      图片

      配置

      说明

      实例名称

      自定义实例名称,非必填项。如果不设置实例名称,默认生成与实例ID相同的名称。
      自定义设置实例名称时,取值说明如下:

      • 不能以数字、短横线(-)开头。
      • 只能包含中文、数字、字母、短横线(-)和下划线(_)。
      • 长度在 1~128 个字符内。

      可用区

      选择需要创建实例的可用区。

      版本

      兼容 6.7.17.10.2 Elasticsearch 版本。
      此处选择 7.10.2 版本为例。

      部署节点类型

      支持以下节点类型:

      • 数据节点:默认配置 3 个,存储规格为 100 GiB 的数据节点。
      • 专有主节点:选择是否启用专有主节点,专有主节点用于保障实例稳定性。启用专有主节点后,需要选择配置节点规格。
      • Kibana 节点:系统免费提供一个 1 核 2 GiB 的 Kibana 节点,不支持修改。

      节点详情

      配置数据节点或专有主节点规格。

      • 节点规格:不同规格包含不同的 CPU 核数和内存,请根据业务需求选择合理的节点规格。
      • 存储类型:目前仅支持 ESSD-PL0
      • 存储规格:设置存储规格,范围为 20~10000 GiB。
      • 节点数量:根据业务需求设置节点的数量。

      说明

      • 不启用专有主节点:建议生产环境至少配置 3 个数据节点,配置 2 个节点时存在脑裂风险,支持配置的数量范围为 1~70。
      • 启用专有主节点:支持配置的数据节点数量范围为 1~70,无其他限制;专有主节点固定为 3 个。

      访问方式

      支持售卖区访问、存储内网访问、公共服务区访问三种方式 。

      • 售卖区访问 :在私有网络 VPC 内可以访问实例,本文选择此种访问方式。
      • 存储内网访问:默认支持物理机访问,不可修改。
      • 公共服务区访问:处于公共服务区的机器可以访问。

      私有网络

      当选择售卖区访问云搜索服务实例时,需要选择特定 VPC,才能访问实例。为保证内网顺利访问,此处需要选择和已创建的 BMQ 资源池所在的 VPC 保持一致。

      说明

      • 实例创建成功后,私有网络无法更改。
      • 同一个 VPC 内,不同可用区子网之间是互通的。

      子网

      系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。

      所属项目

      选择云搜索实例所属的项目,便于管理资源,非必选项。

      用户名

      默认为 admin,不能修改。用于 Kibana 页面登录,以及实例登录认证。

      登录密码

      admin 账号对应的密码,请按照规则设置,并妥善保存。

      • 密码至少包含大写字母、小写字母、数字和特殊字符中的三种,长度为 8~32 个字符。
      • 支持_#!@$%^&*()+=-特殊字符,不可以包含空格和中文。

      确认密码

      输入上一步输入的密码。

      传输协议

      选择传输协议为 HTTP 或者 HTTPS

      注意

      在 Flink 任务中,目前暂时不支持 HTTPS 传输协议的实例,此处选择为 HTTP。如果在 Flink 任务中配置 ES 的 HTTPS 访问地址,任务将运行失败。

      删除保护

      选择是否启用删除保护,默认不启用。
      删除保护功能用于防止实例被意外删除,因此在该选项启用期间,您无法删除实例。

      计费类型

      选择实例的计费类型,支持按量计费包年包月。关于计费的详细信息,请参见计费概述

    3. 确认订单页面,查看实例配置详情,阅读并勾选相关产品协议,然后单击立即购买
      实例购买成功后,您可以单击去控制台,页面会自动跳转至实例列表页,方便您查看刚创建的实例。实例从创建中变为运行中,则表示实例创建成功。

  4. 获取实例访问地址。
    1. 实例列表页面,单击实例名称,进入实例详情页面。
    2. 实例信息页签下的服务访问区域,查看并复制实例访问地址。
      Flink SQL 任务需要设置该地址,以访问实例。
      图片
    3. 您还可以单击 Kibana 公网访问地址后方的开启公网访问,然后勾选需要绑定的公网 IP。
      为 Kibana 开启公网访问能力,以便后续可以登录 Kibana Web 页面,以查询 Index 中的数据。如果您没有可用的公网 IP,可前往控制台创建。相关文档,请参见申请公网 IP

  1. 登录流式计算 Flink 版控制台
  2. 在顶部菜单栏选择目标地域,然后从项目管理页面进入项目。
  3. 创建资源池。
    1. 在项目左侧导航栏选择资源管理,然后单击资源池页签,再单击创建资源池

    2. 创建通用资源池页面,设置资源池基本信息、网络信息、存储信息等参数,然后单击下一步:确认订单
      图片

      一级配置项

      二级配置项

      说明

      基本信息

      资源类型

      默认为通用资源

      资源模式

      默认为 VCI 模式,即在通用资源-VCI 资源域上部署 Flink 资源池。

      计费类型

      在通用资源-VCI 资源域部署 Flink 资源池支持按量计费包年包月计费类型,请按需选择。
      如需了解计费详情,请参见按量计费包年包月
      如果选择包年包月计费类型,还需要选择购买时长,以及确认是否需要自动续费

      资源池名称

      输入要创建的资源池名称。

      • 全局唯一且不能为空字符串。
      • 长度为 1~63 个字符。
      • 支持英文小写字母、数字和短横线(-);且名称开头和结尾必须是字母或数字。

      地域及可用区

      • 地域已选定,不可更改。与 BMQ 资源池、云搜索服务实例保持在同一个地域。
      • 根据业务的网络延迟、高可用等需求,选择一个合适的可用区。

      所属项目

      从下拉列表中选择资源池所属项目。

      资源配置

      计算规格

      如果创建包年包月计费类型,则需要为 Flink 资源池手动配置资源,资源的基础单位为 CU,1 CU 的含义为:CPU 1 核、内存 4 GB。
      请根据实际需求设置资源量。

      网络信息

      私有网络

      从下拉列表中选择私有网络。 需要与 BMQ 资源池、云搜索服务实例保持在相同 VPC 内。

      子网

      从下拉列表中选择子网。
      系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。

      安全组

      从下拉列表中选择安全组。

      存储信息

      TOS 对象存储

      默认为产品初始化时关联的 TOS,不支持修改。

    3. 详情页面,查看资源池配置详情,阅读并勾选 Flink 相关协议,然后单击立即购买
      您可以返回资源池列表页面,查看创建进度。创建完成后显示为运行中

  4. 开发任务。
    1. 在项目左侧导航栏选择任务开发 > Jupyter lab,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink Stream SQL 区块。
      图片

    2. 创建任务对话框,设置任务名称、类型、文件夹和引擎版本,然后单击确定
      图片

      配置

      说明

      任务名称

      自定义设置任务的名称。
      名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

      任务类型

      选择 Flink 任务 > Flink Stream > SQL

      所在文件夹

      系统提供文件夹管理功能,用于分类管理任务。您可以直接选择系统默认存在的数据开发文件夹
      如果您有自建文件夹管理任务的要求,请单击创建文件夹的文件夹按钮,然后创建文件夹。

      引擎版本

      目前支持 Flink 1.11-volcanoFlink 1.16-volcano 版本。

      说明

      本文选择引擎版本为 Flink 1.11。
      如果您设置为 Flink 1.16,那么在后面的 Flink SQL 任务中需要额外添加配置('properties.enable.idempotence'='false'),用来关闭事务消息,否则任务将会运行失败。

      任务描述

      输入任务的描述语句,一般描述任务实现的功能。

    3. 在任务编辑区编写 SQL 任务的业务逻辑代码。
      此处您可以直接拷贝并使用以下代码。代码实现将 datagen 连接器实时生成的随机数先写入 BMQ Topic 中;然后读取 BMQ Topic 数据并输出到 ESCloud Index 中。

      注意

      • 通过 Kafka 连接器往 BMQ Topic 中读写数据时,如果使用 Flink 1.11 引擎版本,则可以直接使用以下 Demo。如果使用 Flink 1.16 引擎版本时,需要额外添加配置('properties.enable.idempotence'='false'),用来关闭事务消息,否则任务将会运行失败。
      • 往 Elasticsearch 中写入数据时,如果使用 elasticsearch-7 连接器,则可以直接使用以下 Demo。如果使用elasticsearch-6 连接器,则需要额外添加配置('document-type'='test'),用来指明文档类型。
      • 使用的 Elasticsearch 连接器版本需要和您创建的云搜索实例版本保持一致,否则可能出现版本不兼容的情况。示例代码中使用了 elasticsearch-7 连接器,那么对应的云搜索实例版本应为 7.10.2。
      • 目前在 Flink 任务中暂时不支持 HTTPS 传输协议的 ES 实例。
      create table orders_datagen (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time as localtimestamp
      ) WITH (
       'connector' = 'datagen',
       'rows-per-second'='1',
       'fields.order_status.length' = '3',
       'fields.order_id.min' = '1',
       'fields.order_id.max' = '10000',
       'fields.order_product_id.min' = '1',
       'fields.order_product_id.max' = '1000',
       'fields.order_customer_id.min' = '1',
       'fields.order_customer_id.max' = '1000'
      );
      
      
      create table bmq_table (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time timestamp
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'topic-a',  --Topic名称。
        'properties.bootstrap.servers' = 'bmq-****q5-1.openstudio.ivolces.com:9092',  --资源池接入点地址。
        'properties.group.id' = 'group-a',    -- BMQ的Consumer Group名称。需要提前创建,否则不能正常提交Offset。
        'scan.startup.mode' = 'earliest-offset',
        'properties.flink.partition-discovery.interval-millis' = '60000',  --动态检测分区的时间间隔。
        'format' = 'json'
      );
      
      
      insert into bmq_table 
      select * from orders_datagen;
      
      
      create table escloud_table (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time timestamp
      ) WITH (
        'connector' = 'elasticsearch-7',  --连接器版本,需要与实例版本保持相同,以免出现不兼容问题。
        'hosts' = 'http://elasticsearch-pt8y***g3mi.escloud.ivolces.com:9200',   --实例访问地址。Flink 任务中暂不支持 HTTPS 传输协议的ES实例。
        'index' = 'test_orders',    --Index名称。无需提前创建索引,实例将会自动创建。
        'username' = 'admin',       --实例访问用户名称。
        'password'='cd****456'   --用户密码。
      );
      
      
      insert into escloud_table
      select * from bmq_table;
      
    4. 单击格式化按钮,系统自动调整SQL代码格式。
      系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。

    5. 在代码编辑区上方,单击验证按钮。
      系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
      图片

  5. 启动任务。
    1. 在任务编辑区,单击上方的上线按钮。

    2. 任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定
      系统会提示任务上线成功,可以前往任务管理页面查看。
      图片

      配置

      说明

      运行资源池

      从下拉列表中选择任务运行的 Flink 资源池。

      任务优先级

      系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。
      任务优先级决定了任务内部的调度顺序,优先级高的任务先被调度,即 L3 先于 L4 被调度。

      调度策略

      根据需求配置任务调度策略:

      • GANG:保证任务的所有实例被一起调度,即当剩余资源满足任务正常运行所需资源时才进行分配;不满足所需资源则不分配。
        该策略不会出现分配资源后,任务却不能启动的现象,解决了资源死锁问题。
      • DRF:从多维资源考虑,更为合理地将资源公平分配给资源池内的各个任务,从而提升利用率。
        例如:剩余10 核 40 GB 的资源,A 任务需要10 核 20 GB 资源;B 任务需要 2 核 8 GB 的资源。如果分配给 A,剩余 0 核 20 GB 资源无法被利用;DRF 策略会选择分配给 B,剩下 8 核 32 GB 可以继续给后来任务使用。

      调度时长

      设置为 GANG 调度策略时,需要设置调度时长。
      如果超过调度时长,任务就会调度失败。如果设置为 0,则会一直重试。

    3. 在项目左侧导航栏选择任务运维 > 任务管理,然后单击目标任务后方的启动按钮。

    4. 启动任务对话框,选择任务启动方式,然后单击确定
      任务启动需要一定时长,请耐心等待。启动成功后,状态为运行中
      图片

      配置

      说明

      启动方式

      请根据实际情况选择任务启动方式:

      • 从最新状态启动:以最新的 Checkpoint 或 Savepoint 启动。
      • 全新启动:不使用 Checkpoint 或 Savepoint,直接启动。
      • 指定快照启动:指定目标快照(Savepoint)启动。

      说明

      首次上线的任务,只能是全新启动方式。

      参数配置

      任务携带在开发侧的并行度、Task Manager 和 Job Manager 的资源配置。在启动任务时支持您更新配置并快速生效。

      说明

      更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。

      • 并行度:任务全局并发数。
      • 单个 TaskManager CPU 数:单个 TaskManager 的 CPU 核数。
      • 单个 TaskManager 内存大小:单个 TaskManager 占用的内存大小。
      • 单个 TaskManager slot 数:单个 TaskManager 的 Slot 数量。
      • JobManager CPU 数:单个 JobManager 的 CPU 核数。
      • JobManager 内存大小:单个 JobManager 占用的内存大小。

      更多设置

      在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。
      图片

      注意

      • 仅当选择指定快照启动从最新状态启动方式时,支持勾选忽略部分算子状态。
      • 当您选择全新启动方式时,不支持忽略算子状态。

结果验证

您可以登录 Kibana Web 页面,查询索引(test_orders)中是否被成功写入随机生成的数据。

  1. 登录云搜索服务控制台

  2. 在顶部导航栏选择目标地域。

  3. 实例列表页面,选择目标实例,然后单击该示例后方的 Kibana
    图片

  4. 在 Kibana 登录页面,输入用户名和密码,单击 Log In,登录 Kibana。

    说明

    用户名为 admin,密码为创建实例时设置的密码。

  5. 在 Kibana Web 页面左侧导航栏选择 Dev Tools,然后在 Console 左侧代码区域输入GET /test_orders/_search,再单击执行按钮。
    当返回如下类似信息时,表示已成功从 BMQ Topic 中读取数据并写入到 ESCloud Index。

    {
      "took" : 1125,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 2546,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "test_orders",
            "_type" : "_doc",
            "_id" : "sBiE2o****WUSExIw",
            "_score" : 1.0,
            "_source" : {
              "order_id" : 1967,
              "order_product_id" : 293,
              "order_customer_id" : 493,
              "order_status" : "3b5",
              "order_update_time" : "2023-03-13 18:21:44.248"
            }
          },
          ......
    

    图片