Filebeat 是用于转发和集中日志数据的轻量级传输程序,可以监听指定的日志文件或位置,从中收集日志事件并将其转发到 Elasticsearch 或 Logstash 进行索引。本文介绍在 Filebeat 中接入消息队列 Kafka版。
Filebeat 的处理流程说明如下:
如需了解 Filebeat 更多信息,请参考开源文档Filebeat 概述、Filebeat 快速入门-安装与配置、Filebeat 工作原理。
本文介绍在 Filebeat 中接入消息队列 Kafka版,要求用于安装 Filebeat 的云服务器和 Kafka 实例两者的所处地域、所属 VPC 等信息相同,以保证网络畅通。
连接 ECS 实例。连接方式请参见连接 ECS 实例。
下载 Filebeat 压缩包。
说明
建议您下载安装 8.X
以上版本的 Filebeat,以避免因低版本无法识别大写 Topic 而出现发送消息的请求无法找到订阅的 Topic 的问题。
本文以 Linux 系统安装 Filebeat 为例,如需了解其他系统如何安装和配置,请参见Filebeat 快速入门-安装与配置。
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz
解压缩 Filebeat 文件。
tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz
(可选)查看 Filebeat 文件。
cd filebeat-8.12.0-linux-x86_64
在 Filebeat 中接入 Kafka 时,Kafka 支持作为 Input 或 Output,请根据实际场景查看相应文档。
在 Kafka 客户端向目标 Topic 发送消息。
请选择合适的方式将消息发送到目标 Topic,相关文档请参见使用默认接入点连接实例、使用 SASL_PLAINTEXT 接入点连接实例或使用 SASL_SSL 接入点连接实例。
使用默认接入点(PLAINTEXT)连接实例并发送消息的示例图如下:
在 Filebeat 中接入 Kafka,并消费目标 Topic 中的消息。
在 Filebeat 安装目录下,创建filebeat.yaml
配置文件。
touch filebeat.yaml
编辑filebeat.yaml
配置文件,配置 Kafka 相关信息,实现从 Kafka 中读取数据。
vim filebeat.yaml
消息队列 Kafka版提供多种安全机制保障数据安全,不同接入点的配置有所不同,请根据实际使用场景查看对应的文档。
filebeat.inputs: - type: kafka # 替换为实际 Kafka 实例的默认接入点。 hosts: ["127.0.0.1:9092"] # 客户端版本。 version: "2.8.0" # 替换为需要读取数据的实际 topic。 topics: ["topic-test"] # 消费组名称,Kafka 服务端会自动创建消费组。 group_id: "group-test" fetch: min: 10240 output.file: # 数据读取后写入的文件路径,替换为本地实际路径。 path: "{$filepath}/output" # 数据读取后写入的文件名前缀。 filename: "kafka"
参数 | 说明 |
---|---|
hosts | Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点。 |
version | 使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。 |
topics | 用于指定读取数据的 Topic 列表。 |
group_id | 用于指定消费数据的消费组名称。 |
fetch.min | 用于指定从服务端单次拉取的消息大小。默认为 1,即单次从服务端最少拉取 1 字节的消息即可返回。建议适量增加单次拉取消息的最小数据量,可以有效降低读取压力。 |
output.file.path | 设置数据读取后写入的文件路径,比如“ |
output.file.filename | 设置数据读取后写入的文件名的前缀。如果设置为“kafka”,则生成的文件名类似为 |
filebeat.inputs: - type: kafka # 替换为实际 Kafka 实例的 SASL_PLAINTEXT 接入点。 hosts: ["127.0.0.1:9093"] # 客户端版本。 version: "2.8.0" # 替换为读取数据的实际topic。 topics: ["topic-test"] # 消费组名称,Kafka 服务端会自动创建消费组。 group_id: "group-test" fetch: min: 10240 # 替换为实际用户名和密码。 username: "user-xxx" password: "password-xxx" # 用户类型,根据真实用户类型填写 PLAIN 或 SCRAM-SHA-256。 sasl.mechanism: "PLAIN" output.file: # 数据读取后写入的文件路径,替换为本地实际路径。 path: "{$filepath}/output" # 数据读取后写入的文件名前缀。 filename: "kafka"
参数 | 说明 |
---|---|
hosts | Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点。 |
version | 使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。 |
topics | 用于指定读取数据的 Topic 列表。 |
group_id | 用于指定消费数据的消费组名称。Filebeat 使用的消费组无需手动创建,在客户端运行过程中会自动进行消费组创建。 |
fetch.min | 用于指定从服务端单次拉取的消息大小。默认为 1,即单次从服务端最少拉取 1 字节的消息即可返回。建议适量增加单次拉取消息的最小数据量,可以有效降低读取压力。 |
username | 通过 SASL_PLAINTEXT 接入点连接实例时,需要通过 SASL 用户名及密码鉴权后才能访问 Kafka 实例。请填写 PLAIN 或 SCRAM 用户的名称。 |
password | 请填写 PLAIN 或 SCRAM 用户的密码。 |
sasl.mechanism | 用户类型,根据真实用户类型填写 |
output.file.path | 设置数据读取后写入的文件路径,比如“ |
output.file.filename | 设置数据读取后写入的文件名的前缀。如果设置为“kafka”,则生成的文件名类似为 |
filebeat.inputs: - type: kafka # 替换为实际 Kafka 实例的 SASL_SSL 接入点。 hosts: ["127.0.0.1:9095"] # 客户端版本。 version: "2.8.0" # 替换为读取数据的实际 topic。 topics: ["topic-test"] # 消费组名称,Kafka 服务端会自动创建消费组。 group_id: "group-test" fetch: min: 10240 # 替换为实际用户名和密码。 username: "user-xxx" password: "password-xxx" # 用户类型,根据真实用户类型填写 PLAIN 或 SCRAM-SHA-256。 sasl.mechanism: "PLAIN" # 打开 SSL 配置。 ssl.enabled: true # 关闭 SSL 的证书域名校验。 ssl.verification_modeedit: none output.file: # 数据读取后写入的文件路径,替换为本地实际路径。 path: "{$filepath}/output" # 数据读取后写入的文件名前缀。 filename: "kafka"
参数 | 说明 |
---|---|
hosts | Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点。 |
version | 使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。 |
topics | 用于指定读取数据的 Topic 列表。 |
group_id | 用于指定消费数据的消费组名称。Filebeat 使用的消费组无需手动创建,在客户端运行过程中会自动进行消费组创建。 |
fetch.min | 用于指定从服务端单次拉取的消息大小。默认为 1,即单次从服务端最少拉取 1 字节的消息即可返回。建议适量增加单次拉取消息的最小数据量,可以有效降低读取压力。 |
username | 通过 SASL_SSL 接入点连接实例时,需要通过 SASL 用户名及密码鉴权后才能访问 Kafka 实例。请填写 PLAIN 或 SCRAM 用户的名称。 |
password | 请填写 PLAIN 或 SCRAM 用户的密码。 |
sasl.mechanism | 用户类型,根据真实用户类型填写 |
ssl.enabled | 使用 SASL_SSL 接入点时,必须打开 SSL 加密传输,因此固定配置为 |
ssl.verification_modeedit | 配置为 |
output.file.path | 设置数据读取后写入的文件路径,比如“ |
output.file.filename | 设置数据读取后写入的文件名的前缀。如果设置为“kafka”,则生成的文件名类似为 |
启动 Filebeat,读取 Kafka 目标 Topic 中的数据。
./filebeat -e -c ./filebeat.yaml
读取数据后,您可以进入到数据读取后写入的文件路径(比如“/doc/output
”),查看获取到的数据文件。
您可以查看到固定前缀(比如“kafka
”)开头的数据文件。
# 进入正确目录。 cd /doc/output # 查看数据文件内容,替换为正确的文件名称,如 kafka2-20240206.ndjson。 tail -f ${文件名称}
创建数据源-日志文件。
本文选择在安装 Filebeat 的云服务器的/doc/input
路径下创建日志文件,并存入日志数据。
示例:执行以下命令,创建日志文件并写入 3 条数据。
# 进入目标目录。 cd /doc/input # 创建日志文件 test.log 并写入数据。 echo "test-msg-1" >> test.log echo "test-msg-2" >> test.log echo "test-msg-3" >> test.log
说明
日志文件中的内容只能被 Filebeat 读取并发送一次,如果要继续发送消息,您需要往日志文件中写入新的数据。
在 Filebeat 中接入 Kafka,读取本地日志文件内容写入 Kafka。
在 Filebeat 安装目录下,创建filebeat.yaml
配置文件。
touch filebeat.yaml
编辑filebeat.yaml
配置文件,配置 Kafka 相关信息,实现读取本地日志文件内容并写入 Kafka。
vim filebeat.yaml
消息队列 Kafka版提供多种安全机制保障数据安全,不同接入点的配置有所不同,请根据实际使用场景查看对应的文档。
filebeat.inputs: - type: log # 定义需要读取的文件,替换为本地实际路径。 paths: - {filepath}/test.log output.kafka: # 替换为实际 Kafka 实例的 PLAINTEXT 接入点。 hosts: ["127.0.0.1:9092"] # 客户端版本. version: "2.8.0" # 数据写入的 Topic 名称。 topic: "topic-test" # 可靠性配置。 required_acks: -1 # 分区选择策略。 partition.round_robin: reachable_only: false
参数 | 说明 |
---|---|
paths | 需要读取的本地日志文件路径,比如“ |
hosts | Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点。 |
version | 使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。 |
topic | 用于指定写入数据的 Topic。 |
required_acks | 用于指定写入数据的可靠性,默认为 -1。
请根据业务实际场景配置写入数据的可靠性,建议使用可靠性中等及以上的配置。 |
partition.round_robin | 指定数据分配方式。推荐使用 |
partition.round_robin.reachable_only | 指定分区选择范围。默认值为 |
filebeat.inputs: - type: log # 数据读取的文件定义,替换为本地实际路径。 paths: - {filepath}/test.log output.kafka: # 替换为实际 Kafka 实例的 SASL_PLAINTEXT 接入点。 hosts: ["127.0.0.1:9093"] # 客户端版本。 version: "2.8.0" # 写入的Topic名称。 topic: "topic-test" # 可靠性配置。 required_acks: -1 # 分区选择策略。 partition.round_robin: reachable_only: false # 替换为实际用户名和密码。 username: "user-xxx" password: "password-xxx" # 用户类型,根据真实用户类型填写 PLAIN 或 SCRAM-SHA-256。 sasl.mechanism: "PLAIN"
参数 | 说明 |
---|---|
paths | 需要读取的本地日志文件路径,比如“ |
hosts | Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点。 |
version | 使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。 |
topic | 用于指定写入数据的 Topic。 |
required_acks | 用于指定写入数据的可靠性,默认为 -1。
请根据业务实际场景配置写入数据的可靠性,建议使用可靠性中等及以上的配置。 |
partition.round_robin | 指定数据分配方式。推荐使用 |
partition.round_robin.reachable_only | 指定分区选择范围。默认值为 |
username | 通过 SASL_PLAINTEXT 接入点连接实例时,需要通过 SASL 用户名及密码鉴权后才能访问 Kafka 实例。请填写 PLAIN 或 SCRAM 用户的名称。 |
password | 请填写 PLAIN 或 SCRAM 用户的密码。 |
sasl.mechanism | 用户类型,根据真实用户类型填写 |
filebeat.inputs: - type: log # 数据读取的文件定义,替换为本地实际路径。 paths: - {filepath}/test.log output.kafka: # 替换为实际 Kafka 实例的 SASL_SSL 接入点。 hosts: ["127.0.0.1:9095"] # 客户端版本。 version: "2.8.0" # 写入数据的 Topic 名称。 topic: "topic-test" # 可靠性配置。 required_acks: -1 # 分区选择策略。 partition.round_robin: reachable_only: false # 替换为实际用户名和密码。 username: "user-xxx" password: "password-xxx" # 用户类型,根据真实用户类型填写 PLAIN 或 SCRAM-SHA-256。 sasl.mechanism: "PLAIN" # 打开SSL配置。 ssl.enabled: true # 关闭SSL的证书域名校验。 ssl.verification_modeedit: none
参数 | 说明 |
---|---|
paths | 需要读取的本地日志文件路径。 |
hosts | Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点。 |
version | 使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。 |
topic | 用于指定写入数据的 Topic。 |
required_acks | 用于指定写入数据的可靠性,默认为 -1。
请根据业务实际场景配置写入数据的可靠性,建议使用可靠性中等及以上的配置。 |
partition.round_robin | 指定数据分配方式。推荐使用 |
partition.round_robin.reachable_only | 指定分区选择范围。默认值为 |
username | 通过 SASL_SSL 接入点连接实例时,需要通过 SASL 用户名及密码鉴权后才能访问 Kafka 实例。请填写 PLAIN 或 SCRAM 用户的名称。 |
password | 请填写 PLAIN 或 SCRAM 用户的密码。 |
sasl.mechanism | 用户类型,根据真实用户类型填写 |
ssl.enabled | 使用 SASL_SSL 接入点时,必须打开 SSL 加密传输,因此固定配置为 |
ssl.verification_modeedit | 配置为 |
启动 Filebeat,读取本地日志文件数据并写入 Kafka。
./filebeat -e -c ./filebeat.yaml
在 Kafka 客户端消费目标 Topic 中的数据。
请选择合适的方式从目标 Topic 中消费数据,相关文档请参见使用默认接入点连接实例、使用 SASL_PLAINTEXT 接入点连接实例或使用 SASL_SSL 接入点连接实例。