You need to enable JavaScript to run this app.
导航
使用 Flink 进行日志数据分析处理
最近更新时间:2024.04.01 15:24:35首次发布时间:2024.04.01 15:24:35

流式计算 Flink版支持和云搜索服务 ES 联动,可以在 ES 侧创建数据处理任务。数据处理任务主要用在日志处理、分析场景,帮助企业快速发现和解决问题,提高运营效率。本文介绍创建数据处理任务的操作步骤。

背景信息

Flink 可以实时从各种数据源中读取日志数据,并进行复杂数据的处理和分析,且可以灵活地处理各种半结构化数据类型的日志数据,并将处理的结果实时写入 ES。ES 可以实时存储和查询海量的日志数据。
数据处理任务创建并完成配置后,您可以启动任务。启动数据处理任务,将会在任务所属 Flink 项目中生成和启动一个同名 Flink 任务,从而实现日志数据的处理分析并将处理的结果数据写入 ES。

功能限制

  • 目前仅 ES 7.10.2 版本实例支持创建数据处理任务。
  • 目前仅支持 Kafka 数据源。

前提条件

步骤一:创建数据处理任务

  1. 登录云搜索服务控制台

  2. 在顶部导航栏,选择目标实例所在的地域。

  3. 在左侧导航栏选择数据处理,然后单击创建任务

  4. 创建任务对话框,选择 Flink 项目,设置数据处理任务名称和描述,然后单击创建任务
    图片

    参数

    说明

    所属 Flink 项目

    选择数据处理任务所属的 Flink 项目。
    Flink 项目是导入的火山引擎项目,更多信息,请参见Flink 导入项目

    任务名称

    自定义设置数据处理任务的名称。启动该任务后,将在所属 Flink 项目中自动创建一个同名的 Flink 任务。

    • 以字母或数字开头,长度范围为1~64 个字符。
    • 支持英文字母、数字、短横线(-)、下划线(_)和英文句点(.)。

    描述

    任务的描述语句。

步骤二:配置数据处理任务

数据处理任务创建后,您可以为任务配置数据来源、数据去向、数据处理脚本和自定义参数等信息。

  1. 在 ES 控制台的数据处理页面,单击目标任务后方的编辑按钮。
    图片

  2. 配置数据来源和数据去向。
    图片

    分类

    参数

    说明

    数据来源

    服务类型

    目前仅支持 Kafka 数据源。

    实例

    根据实例名称关键词搜索目标 Kafka 实例。如需新建 Kafka 实例,请参见创建 Kafka 实例

    Topic

    从实例中选择目标 Topic。如需新建 Topic,请参见创建 Topic

    Consumer Group

    自定义设置 Group 的名称。数据处理任务正常运行后,会自动在 Kafka 中创建 Group。

    说明

    Kafka 默认开启自动创建 Group 功能。如果您的 Kafka 实例关闭了该功能,则无法自动创建,请重新开启该功能。相关文档,请参见开启自动创建 Group 功能

    读取并发度

    读取数据的并发度,默认与所选 Topic 的分区数一致,支持手动修改。

    默认消费起始位置

    读取数据时的启动模式。

    • 最新:从最新位点开始读取。
    • 最早:从最早分区开始读取。
    • Consumer Group:默认值,根据 Group 读取。
    • 时间戳:从指定时间点读取,需要指定时间。

    数据去向

    服务类型

    固定为云搜索服务

    实例

    根据实例名称关键词搜索目标 ES 实例。

    说明

    • ES 6.7.1 实例和 OpenSearch 实例不支持数据处理任务。
    • 如需新建 ES 7.10.2 实例,请参见创建 ES 实例

    索引

    自定义设置索引名称。
    数据处理任务正常运行后,会自动在 ES 实例中创建该索引。

    写入并发度

    写入数据的并发度,与上游算子保持一致。

    用户名

    连接 ES 实例的用户名称,如“admin”。

    密码

    连接 ES 实例的用户密码。
    如果遗忘 admin 用户密码,可在实例详情页面重置。具体操作,请参见重置访问密码

  3. 配置数据处理任务脚本。
    通过添加 Filter 脚本指定对日志数据的处理方式,比如对数据进行提取和处理,灵活处理各种半结构化数据类型的日志数据。如需了解更多信息,请参见Filter Plugins
    示例脚本:

    filter {
      grok {
        match => {
          "log_message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
        }
      }
      mutate {
        add_field => {
          "show" => "This data will be in the output"
        }
      }
      mutate {
        add_field => {
          "[@metadata][test]" => "Hello"
        }
      }
    }
    
  4. (可选)配置数据处理任务自定义参数。
    您设置的自定义参数将会同步到 Flink 控制台,用于 Flink 任务进行更精细的控制和优化,使其更好地适应不同的业务需求和环境变化。同时,还可以提高任务的可维护性和灵活性。
    如需了解更多信息,请参见配置 Flink 自定义参数

  5. 数据处理任务配置完成后,单击页面右上角的保存按钮。

步骤三:启动数据处理任务

  1. 在 ES 控制台左侧导航栏选择数据处理,单击目标任务后方的启动
    图片

    说明

    系统将自动判断任务所属 Flink 项目中是否存在与写入数据的 ES 实例相同 VPC 的 Flink 资源池:

    • 存在相同 VPC 的 Flink 资源池:生成的同名 Flink 任务直接运行在已存在的相同 VPC 的资源池上。
    • 没有相同 VPC 的 Flink 资源池:需要您确认允许自动在 Flink 控制台创建一个按量付费的 Flink 资源池,命名格式为es-flink-***,然后生成的 Flink 任务运行在该资源池上。创建 Flink 资源池会产生一定费用,详情请参见按量计费
      图片
  2. 在 ES 控制台查看任务状态。
    任务初始状态显示为启动中,当状态变为运行中,则表示任务已正常运行。Flink 控制台和 ES 控制台均可控制任务的启停。
  3. Flink 控制台任务运维 > 任务管理页面,查看自动生成的数据处理任务的状态。
    图片

结果验证

数据处理任务正常运行后,如果 Kafka 数据源存在且还在持续写入日志数据,那么您可以查看通过 Flink UI 查看数据处理任务的输出结果;也可以连接 ES 实例查看索引中的文档数据。

  1. Flink 控制台任务运维 > 任务管理页面,单击数据处理任务后方的 Flink UI
    图片
  2. 在 Flink Dashboard 页面,查看任务运行详情。
    图片

查看 ES 索引数据

  1. ES 控制台实例列表页面,单击目标实例操作列的 Kibana
    选择使用公网地址登录 Kibana,需要提前为 Kibana 开启公网访问,请参见配置 Kibana 公网访问
    图片

  2. 在 Kibana 登录页面输入用户名和密码,然后登录。
    用户名为 admin,密码为创建实例时设置的密码。如果遗忘 admin 用户密码,可在实例详情页面重置。具体操作,请参见重置访问密码

  3. 在 Kibana 平台,单击左上角菜单栏,然后选择 Management > Dev Tools

  4. 执行以下命令,查看目标索引和文档数量。

    GET /_cat/indices?v
    

    数据处理任务运行后,正常情况下会生成索引,并向其中写入数据。
    图片

  5. 执行以下命令,查看索引文档数据。

    GET /<index-name>/_search
    

    替换命令中的索引名称然后执行,将会返回索引中的数据,然后查看并判断处理结果是否满足要求。
    图片