流式计算 Flink版支持和云搜索服务 ES 联动,可以在 ES 侧创建数据处理任务。数据处理任务主要用在日志处理、分析场景,帮助企业快速发现和解决问题,提高运营效率。本文介绍创建数据处理任务的操作步骤。
Flink 可以实时从各种数据源中读取日志数据,并进行复杂数据的处理和分析,且可以灵活地处理各种半结构化数据类型的日志数据,并将处理的结果实时写入 ES。ES 可以实时存储和查询海量的日志数据。
数据处理任务创建并完成配置后,您可以启动任务。启动数据处理任务,将会在任务所属 Flink 项目中生成和启动一个同名 Flink 任务,从而实现日志数据的处理分析并将处理的结果数据写入 ES。
登录云搜索服务控制台。
在顶部导航栏,选择目标实例所在的地域。
在左侧导航栏选择数据处理,然后单击创建任务。
在创建任务对话框,选择 Flink 项目,设置数据处理任务名称和描述,然后单击创建任务。
参数 | 说明 |
---|---|
所属 Flink 项目 | 选择数据处理任务所属的 Flink 项目。 |
任务名称 | 自定义设置数据处理任务的名称。启动该任务后,将在所属 Flink 项目中自动创建一个同名的 Flink 任务。
|
描述 | 任务的描述语句。 |
数据处理任务创建后,您可以为任务配置数据来源、数据去向、数据处理脚本和自定义参数等信息。
在 ES 控制台的数据处理页面,单击目标任务后方的编辑按钮。
配置数据来源和数据去向。
分类 | 参数 | 说明 |
---|---|---|
数据来源 | 服务类型 | 目前仅支持 Kafka 数据源。 |
实例 | 根据实例名称关键词搜索目标 Kafka 实例。如需新建 Kafka 实例,请参见创建 Kafka 实例。 | |
Topic | 从实例中选择目标 Topic。如需新建 Topic,请参见创建 Topic。 | |
Consumer Group | 自定义设置 Group 的名称。数据处理任务正常运行后,会自动在 Kafka 中创建 Group。 说明 Kafka 默认开启自动创建 Group 功能。如果您的 Kafka 实例关闭了该功能,则无法自动创建,请重新开启该功能。相关文档,请参见开启自动创建 Group 功能。 | |
读取并发度 | 读取数据的并发度,默认与所选 Topic 的分区数一致,支持手动修改。 | |
默认消费起始位置 | 读取数据时的启动模式。
| |
数据去向 | 服务类型 | 固定为云搜索服务。 |
实例 | 根据实例名称关键词搜索目标 ES 实例。 说明
| |
索引 | 自定义设置索引名称。 | |
写入并发度 | 写入数据的并发度,与上游算子保持一致。 | |
用户名 | 连接 ES 实例的用户名称,如“admin”。 | |
密码 | 连接 ES 实例的用户密码。 |
配置数据处理任务脚本。
通过添加 Filter 脚本指定对日志数据的处理方式,比如对数据进行提取和处理,灵活处理各种半结构化数据类型的日志数据。如需了解更多信息,请参见Filter Plugins。
示例脚本:
filter { grok { match => { "log_message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } mutate { add_field => { "show" => "This data will be in the output" } } mutate { add_field => { "[@metadata][test]" => "Hello" } } }
(可选)配置数据处理任务自定义参数。
您设置的自定义参数将会同步到 Flink 控制台,用于 Flink 任务进行更精细的控制和优化,使其更好地适应不同的业务需求和环境变化。同时,还可以提高任务的可维护性和灵活性。
如需了解更多信息,请参见配置 Flink 自定义参数。
数据处理任务配置完成后,单击页面右上角的保存按钮。
说明
系统将自动判断任务所属 Flink 项目中是否存在与写入数据的 ES 实例相同 VPC 的 Flink 资源池:
es-flink-***
,然后生成的 Flink 任务运行在该资源池上。创建 Flink 资源池会产生一定费用,详情请参见按量计费。数据处理任务正常运行后,如果 Kafka 数据源存在且还在持续写入日志数据,那么您可以查看通过 Flink UI 查看数据处理任务的输出结果;也可以连接 ES 实例查看索引中的文档数据。
在 ES 控制台的实例列表页面,单击目标实例操作列的 Kibana。
选择使用公网地址登录 Kibana,需要提前为 Kibana 开启公网访问,请参见配置 Kibana 公网访问。
在 Kibana 登录页面输入用户名和密码,然后登录。
用户名为 admin,密码为创建实例时设置的密码。如果遗忘 admin 用户密码,可在实例详情页面重置。具体操作,请参见重置访问密码。
在 Kibana 平台,单击左上角菜单栏,然后选择 Management > Dev Tools。
执行以下命令,查看目标索引和文档数量。
GET /_cat/indices?v
数据处理任务运行后,正常情况下会生成索引,并向其中写入数据。
执行以下命令,查看索引文档数据。
GET /<index-name>/_search
替换命令中的索引名称然后执行,将会返回索引中的数据,然后查看并判断处理结果是否满足要求。