日志服务提供 Kafka 协议消费功能,可以将一个日志主题当作一个 Kafka Topic 来消费,每条日志对应一条 Kafka 消息。您可以使用 Flink kafka 连接器连接日志服务,通过 Flink 任务将日志服务中采集的日志数据消费到下游的大数据组件或者数据仓库。
本文通过 Flink SQL 任务,实现读取 TLS 主题中的日志数据,然后写入到 ESCloud 索引中。
为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 TLS 项目、云搜索服务实例和 Flink 资源池均处于相同地域的同一个 VPC 内。
您可以在创建云资源前,先创建私有网络。相关文档,请参见创建私有网络和创建子网。
为了实现用 Flink SQL 任务消费 TLS 日志,首先需要先完成 TLS 相关准备工作。如,创建日志项目、创建日志主题、开通主题的 Kafka 协议消费、获取与 TLS 交互需要用到的连接地址。
登录日志服务控制台。
在顶部导航栏选择目标地域。
创建日志项目。
创建日志主题。
在项目详情页面的日志主题区域,单击创建日志主题。
在创建日志主题对话框,设置主题名称、日志存储时长、日志分区数量等关键参数,然后单击确定。
配置 | 说明 |
---|---|
主题名称 | 自定义设置日志主题的名称。 |
日志存储时长 | 日志在日志服务中的保存时间,超过指定的日志存储时长后,此日志主题中的过期日志会被自动清除。 |
日志分区数量 | 日志分区的数量,默认创建 1 个分区,取值范围为1~10。 每个分区提供的写入能力为 5 MiB/s、500 次/s,读取能力为 10 MiB/s、100 次/s。 |
自动分裂日志分区 | 是否开启分区的自动分裂功能。
|
最大分裂数 | 分区的最大分裂数,即分区分裂后,所有分区的最大数量。取值范围为 1~10,默认为 10。 |
描述 | 日志主题的简单描述。 |
开通 Kafka 协议消费。
获取接入点地址。
使用 Flink SQL 任务消费 TLS 日志,需要准备一个数据写入的目的源,本文使用的是 ESCloud 索引。请按照以下步骤创建实例,并获取实例的访问地址。
在实例列表页面,单击创建实例。
在创建实例页面,设置实例名称,可用区版本、节点详情和访问方式等参数,然后单击确认订单。
配置 | 说明 |
---|---|
实例名称 | 自定义实例名称,非必填项。如果不设置实例名称,默认生成与实例 ID 相同的名称。
|
可用区 | 选择需要创建实例的可用区。 |
版本 | 兼容 6.7.1 及 7.10.2 Elasticsearch 版本。 |
部署节点类型 | 支持以下节点类型:
|
节点详情 | 配置数据节点或专有主节点规格。
说明
|
访问方式 | 支持售卖区访问、存储内网访问、公共服务区访问三种方式 。
|
私有网络 | 当选择售卖区访问方式访问搜索服务实例时,需要选择特定 VPC,才能访问实例。为保证内网顺利访问,此处需要选择和 Flink 资源池所在的 VPC 保持一致。 |
子网 | 系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。 |
所属项目 | 选择云搜索实例所属的项目,便于管理资源,非必选项。 |
用户名 | 默认为 admin,不能修改。用于 Kibana 页面登录,以及实例登录认证。 |
登录密码 | admin 账号对应的密码,请按照规则设置,并妥善保存。
|
确认密码 | 输入上一步输入的密码。 |
传输协议 | 选择传输协议为 HTTP 或者 HTTPS。 注意 在 Flink 任务中,目前暂时不支持 HTTPS 传输协议的实例,此处选择为 HTTP。如果在 Flink 任务中配置 ESCloud 的 HTTPS 访问地址,任务将运行失败。 |
删除保护 | 选择是否启用删除保护,默认不启用。 |
计费类型 | 选择实例的计费类型,支持按量计费和包年包月。关于计费的详细信息,请参见计费概述。 |
在确认订单页面,查看实例配置详情,阅读并勾选相关产品协议,然后单击立即购买,即可完成实例的创建。
实例购买成功后,您可以单击去控制台,页面会自动跳转至实例列表页,方便您查看刚创建的实例。实例从创建中变为运行中,则表示实例创建成功。
本文创建两个独立的 Flink SQL 任务。一个任务负责生产消息,将随机数据写入 TLS 主题;另一个负责消费消息,从 TLS 主题读取数据并写入到 ESCloud Index 中。
在顶部菜单栏选择目标地域,然后从项目管理页面进入项目。
在项目左侧导航栏选择资源管理,然后单击资源池页签,再单击创建资源池。
在创建通用资源池页面,设置资源池基本信息、网络信息、存储信息等参数,然后单击下一步:确认订单。
一级配置项 | 二级配置项 | 说明 |
---|---|---|
基本信息 | 资源类型 | 默认为通用资源。 |
资源模式 | 默认为 VCI 模式,即在通用资源-VCI 资源域上部署 Flink 资源池。 | |
计费类型 | 在通用资源-VCI 资源域部署 Flink 资源池支持按量计费和包年包月计费类型,请按需选择。 | |
资源池名称 | 输入要创建的资源池名称。
| |
地域及可用区 |
| |
所属项目 | 从下拉列表中选择资源池所属项目。 | |
资源配置 | 计算规格 | 如果创建包年包月计费类型,则需要为 Flink 资源池手动配置资源,资源的基础单位为 CU,1 CU 的含义为:CPU 1 核、内存 4 GB。 |
网络信息 | 私有网络 | 从下拉列表中选择私有网络。 需要与日志项目、云搜索服务实例保持在相同 VPC 内。 |
子网 | 从下拉列表中选择子网。 | |
安全组 | 从下拉列表中选择安全组。 | |
存储信息 | TOS 对象存储 | 默认为产品初始化时关联的 TOS,不支持修改。 |
在详情页面,查看资源池配置详情,阅读并勾选 Flink 相关协议,然后单击立即购买。
您可以返回资源池列表页面,查看创建进度。创建完成后显示为运行中。
按照以下步骤开发并运行负责生产消息的 Flink SQL 任务,实现将随机数据写入 TLS 主题。
在项目左侧导航栏选择任务开发 > Jupyter lab,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink Stream SQL 区块。
在创建任务对话框,设置任务名称、类型、文件夹和引擎版本,然后单击确定。
配置 | 说明 |
---|---|
任务名称 | 自定义设置任务的名称,如“Datagen-TLS9094”。 |
任务类型 | 选择 Flink 任务 > Flink Stream > SQL。 |
所在文件夹 | 系统提供文件夹管理功能,用于分类管理任务。您可以直接选择系统默认存在的数据开发文件夹。 |
引擎版本 | 目前支持 Flink 1.11-volcano 和 Flink 1.16-volcano 版本。 注意 此处建议选择 Flink 1.11-volcano 版本。由于 Kafka Client 默认开启了事务消息,需要在 SQL 语句中通过设置关闭事务消息,而 Flink 1.16-volcano 版本暂不支持。 |
任务描述 | 输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编写生产消息的 SQL 任务的业务逻辑代码。
您可以直接使用以下代码,修改 TLS 相关信息即可。代码实现将 Datagen 连接器实时生成的随机数写入 TLS 主题中。
create table orders_datagen ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '10000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '1000', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); create table kafka_table ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'kafka', 'topic' = '88999d84-b*****a7e809edba7', --TLS 主题 ID。 'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9094', --TLS 日志项目接入点地址,作为消息接收端时端口固定为 9094。 'format' = 'json', 'properties.enable.idempotence' = 'false', --关闭事务消息。 'properties.security.protocol' = 'SASL_SSL', --SASL_SSL 连接协议,保证日志传输安全。 'properties.sasl.mechanism' = 'PLAIN', 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{SASL_user}" password="{SASL_password}";' --Kafka SASL 用户名和密码。 ); insert into kafka_table select * from orders_datagen;
代码中使用了 Datagen 连接器,用以生成随机数据,模拟需要写入的日志数据。如需了解该连接器更多信息,请参见Datagen。
代码中 TLS 日志主题采用 Kafka 协议采集数据,请根据以下表格填写 TLS 相关信息。
配置项 | 说明 |
---|---|
connector | 指定使用的连接器。 |
topic | 指定日志写入的目标 TLS 主题,取值为主题 ID。 |
properties.bootstrap.servers | 指定 TLS 日志项目的访问地址,格式为
|
format | 用来反序列化 Kafka 消息体(value)时使用的格式。此处设置为 json。 |
properties.enable.idempotence | 是否启用 Kafka Client 的事务消息能力,此处设置为 false,以关闭事务消息。 |
properties.security.protocol | 使用 SASL_SSL 连接协议,保证日志传输安全。 |
properties.sasl.mechanism | 将 SASL 机制配置为 PLAIN。 |
properties.sasl.jaas.config | 配置 JAAS。
注意 建议配置为子用户(IAM 用户)的密钥,以降低安全风险。 |
单击格式化按钮,系统自动调整SQL代码格式。
系统将自动美化您的 SQL 语句,使得语句更加美观、整洁、可读。
在代码编辑区上方,单击验证按钮。
系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success。
(可选)调试任务。
验证功能只能校验 SQL 语法正确性,无法完全规避代码运行中可能出现的错误,在任务上线前,建议您进行任务调试。
说明
Flink Stream 类型任务选择执行方式为 STREAMING;Flink Batch 类型任务选择执行方式为 BATCH。
启动任务。
在任务编辑区,单击上方的上线按钮。
在任务上线设置对话框,选择 Flink 资源池、设置任务优先级和调度策略,然后单击确定。
系统会提示任务上线成功,可以前往任务管理页面查看。
配置 | 说明 |
---|---|
运行资资源池 | 从下拉列表中选择任务运行的 Flink 资源池。 |
任务优先级 | 系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。 |
调度策略 | 根据需求配置任务调度策略:
|
调度时长 | 设置为 GANG 调度策略时,需要设置调度时长。 |
在项目左侧导航栏选择任务运维 > 任务管理,然后单击目标任务后方的启动按钮。
在启动任务对话框,选择任务启动方式,然后单击确定。
任务启动需要一定时长,请耐心等待。启动成功后,状态为运行中。
配置 | 说明 |
---|---|
启动方式 | 请根据实际情况选择任务启动方式:
说明 首次上线的任务,只能是全新启动方式。 |
参数配置 | 任务携带在开发侧的并行度、Task Manager 和 Job Manager 的资源配置。在启动任务时支持您更新配置并快速生效。 说明 更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。
|
更多设置 | 在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。 注意
|
消费消息的开发流程与生产消息的开发流程类似,请参考上面步骤自主完成任务开发、调试、运行等步骤,实现读取 TLS 主题数据写入到 ESCloud Index。
此处仅介绍注意事项和提供示例代码:
创建任务时,任务名称可以命名为TLS9093-ESCloud
,引擎版本仍选择 Flink 1.11-volcano。
任务示例代码如下,请按实际情况填写 TLS 和 ESCloud 的连接信息。
create table kafka_table ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'out-8899**e809edba7', --TLS 主题的 Kafka 协议消费主题 ID。 'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9093', --TLS 日志项目接入点地址,端口固定为 9093。 'scan.startup.mode' = 'earliest-offset', 'properties.flink.partition-discovery.interval-millis' = '60000', --动态检测分区的时间间隔。 'format' = 'json', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'PLAIN', 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{SASL_user}" password="{SASL_password}";' ); --Kafka SASL 用户名和密码。 create table escloud_table ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'elasticsearch-7', --连接器版本,需要与实例版本保持相同,以免出现不兼容问题。 'hosts' = 'http://elasticsearch-g**m.escloud.ivolces.com:9200', --实例访问地址。 'index' = 'test_orders_3', --Index 名称。无需提前创建索引,实例将会自动创建。 'username' = 'admin', --实例访问用户名称。 'password'='cdq***56' --用户密码。 ); insert into escloud_table select * from kafka_table;
目的 | 配置项 | 说明 |
---|---|---|
连接 TLS 主题 | connector | 指定使用的连接器。 |
topic | 指定读取日志数据的目标 TLS 主题,取值为 Kafka 协议消费主题 ID。 | |
properties.bootstrap.servers | 指定 TLS 日志项目的访问地址,格式为
| |
scan.startup.mode | 读取数据时的启动模式。 这里取值为 earliest-offset,表示从 Kafka 最早分区开始读取。 | |
format | 用来反序列化 Kafka 消息体(value)时使用的格式。此处设置为 json。 | |
properties.security.protocol | 使用 SASL_SSL 连接协议,保证日志传输安全。 | |
properties.sasl.mechanism | 将 SASL 机制配置为 PLAIN。 | |
properties.sasl.jaas.config | 配置 JAAS。
注意 建议配置为子用户(IAM 用户)的密钥,降低安全风险。 | |
连接 ESCloud Index | connector | 指定使用的连接器。 注意 这里的连接器版本需要和云搜索服务实例版本保持一致,以免出现版本冲突。 |
hosts | 实例访问地址。 | |
index | 数据写入的索引名称。 | |
username | 实例访问用户。 | |
password | 实例访问用户的密码。 |
本文创建了两个任务,一个负责生产消息,一个负责消费消息。您可以在 TLS 侧检索写入的数据,以验证生产消息;可以在 Kibana Web 页面查询 ESCloud Index 中是否有数据,以验证消费 TLS 日志。
负责生产消息的任务正常运行后,您可以前往日志服务控制台,检索 TLS 日志,从而判断是否写入了随机数据。
当消费信息的任务运行正常后,您可以登录 Kibana Web 页面,查询索引(test_orders_3
)中是否被写入 TLS 日志数据。
登录云搜索服务控制台。
在顶部导航栏选择目标地域。
在实例列表页面,选择目标实例,然后单击该示例后方的 Kibana。
在 Kibana 登录页面,输入用户名和密码,单击 Log In,登录 Kibana。
说明
用户名默认 admin,密码为创建实例时设置的密码。
在 Kibana Web 页面左侧导航栏选择 Dev Tools,然后在 Console 左侧代码区域输入GET /test_orders_3/_search
,再单击执行按钮。
当返回如下类似信息时,表示已成功从 BMQ Topic 中读取数据并写入到 ESCloud Index。
{ "took" : 545, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1410, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "test_orders_3", "_type" : "_doc", "_id" : "fBh-NYc**i2l", "_score" : 1.0, "_source" : { "order_id" : 9316, "order_product_id" : 956, "order_customer_id" : 798, "order_status" : "316", "order_update_time" : "2023-03-31 10:28:35.558" } }, ......仅展示部分数据。