You need to enable JavaScript to run this app.
导航
接入 Filebeat
最近更新时间:2024.09.19 10:43:37首次发布时间:2024.02.21 14:01:33

Filebeat 是用于转发和集中日志数据的轻量级传输程序,可以监听指定的日志文件或位置,从中收集日志事件并将其转发到 Elasticsearch 或 Logstash 进行索引。本文介绍在 Filebeat 中接入消息队列 Kafka版。

背景信息

Filebeat 的处理流程说明如下:

  1. Filebeat 启动一个或多个 Input,Input 在指定的位置中查找日志数据。
  2. Filebeat 会为每个找到的日志启动 Harvester,Harvester 读取日志并将日志数据发送到 libbeat。
  3. libbeat 聚集数据,然后将聚集的数据发送到配置的 Output。

图片
如需了解 Filebeat 更多信息,请参考开源文档Filebeat 概述Filebeat 快速入门-安装与配置Filebeat 工作原理

前提条件

本文介绍在 Filebeat 中接入消息队列 Kafka版,要求用于安装 Filebeat 的云服务器和 Kafka 实例两者的所处地域、所属 VPC 等信息相同,以保证网络畅通。

  • 创建私有网络和子网,操作步骤请参见创建私有网络创建子网
  • 创建云服务器,用于安装 Filebeat 和连接 Kafka 实例,操作步骤请参考购买云服务器
  • 云服务器上需要安装 Java 1.8 或以上版本 JDK,具体信息请参见安装JDK
  • 创建 Kafka 实例和 Topic,相关文档请参见创建实例创建 Topic
  • 如果您选择使用 SASL_SSL 接入点和 SASL_PLAINTEXT 接入点连接 Kafka 实例,则需要提前创建 SASL 用户。具体操作请参见创建 SASL 用户

步骤一:下载安装 Filebeat

  1. 连接 ECS 实例。连接方式请参见连接 ECS 实例

  2. 下载 Filebeat 压缩包。

    说明

    建议您下载安装 8.X 以上版本的 Filebeat,以避免因低版本无法识别大写 Topic 而出现发送消息的请求无法找到订阅的 Topic 的问题。
    本文以 Linux 系统安装 Filebeat 为例,如需了解其他系统如何安装和配置,请参见Filebeat 快速入门-安装与配置

    curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz
    
  3. 解压缩 Filebeat 文件。

    tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz
    
  4. (可选)查看 Filebeat 文件。

    cd filebeat-8.12.0-linux-x86_64
    
    • filebeat :用于启动 Filebeat 的二进制文件。
      图片

步骤二:在 Filebeat 中接入 Kafka

在 Filebeat 中接入 Kafka 时,Kafka 支持作为 InputOutput,请根据实际场景查看相应文档。

Kafka 作为 Input

  1. 在 Kafka 客户端向目标 Topic 发送消息。
    请选择合适的方式将消息发送到目标 Topic,相关文档请参见使用默认接入点连接实例使用 SASL_PLAINTEXT 接入点连接实例使用 SASL_SSL 接入点连接实例
    使用默认接入点(PLAINTEXT)连接实例并发送消息的示例图如下:
    图片

  2. 在 Filebeat 中接入 Kafka,并消费目标 Topic 中的消息。

    1. 在 Filebeat 安装目录下,创建filebeat.yaml配置文件。

      touch filebeat.yaml
      
    2. 编辑filebeat.yaml配置文件,配置 Kafka 相关信息,实现从 Kafka 中读取数据。

      vim filebeat.yaml
      

      消息队列 Kafka版提供多种安全机制保障数据安全,不同接入点的配置有所不同,请根据实际使用场景查看对应的文档。

      • 使用 PLAINTEXT(默认)接入点
        配置文件内容如下:
      filebeat.inputs:
      - type: kafka
        # 替换为实际 Kafka 实例的默认接入点。
        hosts: ["127.0.0.1:9092"]
        # 客户端版本。
        version: "2.8.0"
        # 替换为需要读取数据的实际 topic。
        topics: ["topic-test"]
        # 消费组名称,Kafka 服务端会自动创建消费组。
        group_id: "group-test"
        fetch:
          min: 10240
       
      output.file:
        # 数据读取后写入的文件路径,替换为本地实际路径。
        path: "{$filepath}/output"
        # 数据读取后写入的文件名前缀。
        filename: "kafka"
      

      参数

      说明

      hosts

      Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点

      version

      使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。

      topics

      用于指定读取数据的 Topic 列表。

      group_id

      用于指定消费数据的消费组名称。
      Filebeat 使用的消费组无需手动创建,在客户端运行过程中会自动进行消费组创建。

      fetch.min

      用于指定从服务端单次拉取的消息大小。默认为 1,即单次从服务端最少拉取 1 字节的消息即可返回。建议适量增加单次拉取消息的最小数据量,可以有效降低读取压力。

      output.file.path

      设置数据读取后写入的文件路径,比如“/doc/output”。

      output.file.filename

      设置数据读取后写入的文件名的前缀。如果设置为“kafka”,则生成的文件名类似为kafka-20240206.ndjson

      • 使用 SASL_PLAINTEXT 接入点
        使用 SASL_PLAINTEXT 接入点时,需要在配置文件中指定接入协议和用户配置。Kafka 实例目前支持 PLAIN 和 SCRAM 两种不同的用户类型。
        配置文件内容如下:
      filebeat.inputs:
      - type: kafka
        # 替换为实际 Kafka 实例的 SASL_PLAINTEXT 接入点。
        hosts: ["127.0.0.1:9093"]
        # 客户端版本。
        version: "2.8.0"
        # 替换为读取数据的实际topic。
        topics: ["topic-test"]
        # 消费组名称,Kafka 服务端会自动创建消费组。
        group_id: "group-test"
        fetch:
          min: 10240
        
        # 替换为实际用户名和密码。
        username: "user-xxx"
        password: "password-xxx"
        # 用户类型,根据真实用户类型填写 PLAIN 或 SCRAM-SHA-256。
        sasl.mechanism: "PLAIN"
        
      output.file:
        # 数据读取后写入的文件路径,替换为本地实际路径。
        path: "{$filepath}/output"
        # 数据读取后写入的文件名前缀。
        filename: "kafka"
      

      参数

      说明

      hosts

      Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点

      version

      使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。

      topics

      用于指定读取数据的 Topic 列表。

      group_id

      用于指定消费数据的消费组名称。Filebeat 使用的消费组无需手动创建,在客户端运行过程中会自动进行消费组创建。

      fetch.min

      用于指定从服务端单次拉取的消息大小。默认为 1,即单次从服务端最少拉取 1 字节的消息即可返回。建议适量增加单次拉取消息的最小数据量,可以有效降低读取压力。

      username

      通过 SASL_PLAINTEXT 接入点连接实例时,需要通过 SASL 用户名及密码鉴权后才能访问 Kafka 实例。请填写 PLAIN 或 SCRAM 用户的名称。

      password

      请填写 PLAIN 或 SCRAM 用户的密码。

      sasl.mechanism

      用户类型,根据真实用户类型填写 PLAINSCRAM-SHA-256

      output.file.path

      设置数据读取后写入的文件路径,比如“/doc/output”。

      output.file.filename

      设置数据读取后写入的文件名的前缀。如果设置为“kafka”,则生成的文件名类似为kafka-20240206.ndjson

      • 使用 SASL_SSL 接入点
        使用 SASL_SSL 接入点时,需要在配置文件中指定接入协议和用户配置,以及需要开启 SSL 的加密传输。Kafka 实例目前支持 PLAIN 和 SCRAM 两种不同的用户类型。
        配置文件内容如下:
      filebeat.inputs:
      - type: kafka
        # 替换为实际 Kafka 实例的 SASL_SSL 接入点。
        hosts: ["127.0.0.1:9095"]
        # 客户端版本。
        version: "2.8.0"
        # 替换为读取数据的实际 topic。
        topics: ["topic-test"]
        # 消费组名称,Kafka 服务端会自动创建消费组。
        group_id: "group-test"
        fetch:
          min: 10240
         
        # 替换为实际用户名和密码。
        username: "user-xxx"
        password: "password-xxx"
        # 用户类型,根据真实用户类型填写 PLAIN 或 SCRAM-SHA-256。
        sasl.mechanism: "PLAIN"
        
        # 打开 SSL 配置。
        ssl.enabled: true
        # 关闭 SSL 的证书域名校验。
        ssl.verification_modeedit: none
        
      output.file:
        # 数据读取后写入的文件路径,替换为本地实际路径。
        path: "{$filepath}/output"
        # 数据读取后写入的文件名前缀。
        filename: "kafka"
      

      参数

      说明

      hosts

      Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点

      version

      使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。

      topics

      用于指定读取数据的 Topic 列表。

      group_id

      用于指定消费数据的消费组名称。Filebeat 使用的消费组无需手动创建,在客户端运行过程中会自动进行消费组创建。

      fetch.min

      用于指定从服务端单次拉取的消息大小。默认为 1,即单次从服务端最少拉取 1 字节的消息即可返回。建议适量增加单次拉取消息的最小数据量,可以有效降低读取压力。

      username

      通过 SASL_SSL 接入点连接实例时,需要通过 SASL 用户名及密码鉴权后才能访问 Kafka 实例。请填写 PLAIN 或 SCRAM 用户的名称。

      password

      请填写 PLAIN 或 SCRAM 用户的密码。

      sasl.mechanism

      用户类型,根据真实用户类型填写 PLAINSCRAM-SHA-256

      ssl.enabled

      使用 SASL_SSL 接入点时,必须打开 SSL 加密传输,因此固定配置为true

      ssl.verification_modeedit

      配置为none,表示关闭 SSL 证书域名校验。

      output.file.path

      设置数据读取后写入的文件路径,比如“/doc/output”。

      output.file.filename

      设置数据读取后写入的文件名的前缀。如果设置为“kafka”,则生成的文件名类似为kafka-20240206.ndjson

  3. 启动 Filebeat,读取 Kafka 目标 Topic 中的数据。

    ./filebeat -e -c ./filebeat.yaml
    

    图片

  4. 读取数据后,您可以进入到数据读取后写入的文件路径(比如“/doc/output”),查看获取到的数据文件。
    您可以查看到固定前缀(比如“kafka”)开头的数据文件。

    # 进入正确目录。
    cd /doc/output
    
    # 查看数据文件内容,替换为正确的文件名称,如 kafka2-20240206.ndjson。
    tail -f ${文件名称}
    

    图片

Kafka 作为 Output

  1. 创建数据源-日志文件。
    本文选择在安装 Filebeat 的云服务器的/doc/input路径下创建日志文件,并存入日志数据。
    示例:执行以下命令,创建日志文件并写入 3 条数据。

    # 进入目标目录。
    cd /doc/input
    
    # 创建日志文件 test.log 并写入数据。
    echo "test-msg-1" >> test.log
    echo "test-msg-2" >> test.log
    echo "test-msg-3" >> test.log
    

    说明

    日志文件中的内容只能被 Filebeat 读取并发送一次,如果要继续发送消息,您需要往日志文件中写入新的数据。

  2. 在 Filebeat 中接入 Kafka,读取本地日志文件内容写入 Kafka。

    1. 在 Filebeat 安装目录下,创建filebeat.yaml配置文件。

      touch filebeat.yaml
      
    2. 编辑filebeat.yaml配置文件,配置 Kafka 相关信息,实现读取本地日志文件内容并写入 Kafka。

      vim filebeat.yaml
      

      消息队列 Kafka版提供多种安全机制保障数据安全,不同接入点的配置有所不同,请根据实际使用场景查看对应的文档。

      • 使用 PLAINTEXT(默认)接入点
        配置文件内容如下:
      filebeat.inputs:
      - type: log
        # 定义需要读取的文件,替换为本地实际路径。
        paths:
          - {filepath}/test.log
       
      output.kafka:
        # 替换为实际 Kafka 实例的 PLAINTEXT 接入点。
        hosts: ["127.0.0.1:9092"]
        # 客户端版本.
        version: "2.8.0"
        # 数据写入的 Topic 名称。
        topic: "topic-test"
        # 可靠性配置。
        required_acks: -1
        # 分区选择策略。
        partition.round_robin:
          reachable_only: false
      

      参数

      说明

      paths

      需要读取的本地日志文件路径,比如“/doc/input”。

      hosts

      Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点

      version

      使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。

      topic

      用于指定写入数据的 Topic。

      required_acks

      用于指定写入数据的可靠性,默认为 -1。

      • 配置为0,表示可靠性最低,不关心数据的写入结果。
      • 配置为1,表示可靠性中等,数据写入主副本则认为写入成功。
      • 配置为-1,表示最高可靠性,数据写入当前正常运行的所有副本则认为写入成功。

      请根据业务实际场景配置写入数据的可靠性,建议使用可靠性中等及以上的配置。

      partition.round_robin

      指定数据分配方式。推荐使用round_roubin的方式,可以增加数据聚合,降低写入压力。
      除了 round_roubin 之外,还可以选择rangehash方式,默认为range

      partition.round_robin.reachable_only

      指定分区选择范围。默认值为false,数据写入时会从所有的分区中进行选择。当设置为true时仅会选择正常可用的分区写入。

      • 使用 SASL_PLAINTEXT 接入点
        使用 SASL_PLAINTEXT 接入点时,需要在配置文件中指定接入协议和用户配置。Kafka 实例目前支持 PLAIN 和 SCRAM 两种不同的用户类型。
        配置文件内容如下:
      filebeat.inputs:
      - type: log
        # 数据读取的文件定义,替换为本地实际路径。
        paths:
          - {filepath}/test.log
      
      output.kafka:
        # 替换为实际 Kafka 实例的 SASL_PLAINTEXT 接入点。
        hosts: ["127.0.0.1:9093"]
        # 客户端版本。
        version: "2.8.0"
        # 写入的Topic名称。
        topic: "topic-test"
        # 可靠性配置。
        required_acks: -1
        # 分区选择策略。
        partition.round_robin:
          reachable_only: false
       
        # 替换为实际用户名和密码。
        username: "user-xxx"
        password: "password-xxx"
        # 用户类型,根据真实用户类型填写 PLAIN 或 SCRAM-SHA-256。
        sasl.mechanism: "PLAIN"
      

      参数

      说明

      paths

      需要读取的本地日志文件路径,比如“/doc/input”。

      hosts

      Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点

      version

      使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。

      topic

      用于指定写入数据的 Topic。

      required_acks

      用于指定写入数据的可靠性,默认为 -1。

      • 配置为0,表示可靠性最低,不关心数据的写入结果。
      • 配置为1,表示可靠性中等,数据写入主副本则认为写入成功。
      • 配置为-1,表示最高可靠性,数据写入当前正常运行的所有副本则认为写入成功。

      请根据业务实际场景配置写入数据的可靠性,建议使用可靠性中等及以上的配置。

      partition.round_robin

      指定数据分配方式。推荐使用round_roubin的方式,可以增加数据聚合,降低写入压力。
      除了 round_roubin 之外,还可以选择rangehash方式,默认为range

      partition.round_robin.reachable_only

      指定分区选择范围。默认值为false,数据写入时会从所有的分区中进行选择。当设置为true时仅会选择正常可用的分区写入。

      username

      通过 SASL_PLAINTEXT 接入点连接实例时,需要通过 SASL 用户名及密码鉴权后才能访问 Kafka 实例。请填写 PLAIN 或 SCRAM 用户的名称。

      password

      请填写 PLAIN 或 SCRAM 用户的密码。

      sasl.mechanism

      用户类型,根据真实用户类型填写 PLAINSCRAM-SHA-256

      • 使用 SASL_SSL 接入点
        使用 SASL_SSL 接入点时,需要在配置文件中指定接入协议和用户配置,以及需要开启 SSL 的加密传输。Kafka 实例目前支持 PLAIN 和 SCRAM 两种不同的用户类型。
        配置文件内容如下:
      filebeat.inputs:
      - type: log
        # 数据读取的文件定义,替换为本地实际路径。
        paths:
          - {filepath}/test.log
      
      output.kafka:
        # 替换为实际 Kafka 实例的 SASL_SSL 接入点。
        hosts: ["127.0.0.1:9095"]
        # 客户端版本。
        version: "2.8.0"
        # 写入数据的 Topic 名称。
        topic: "topic-test"
        # 可靠性配置。
        required_acks: -1
        # 分区选择策略。
        partition.round_robin:
          reachable_only: false 
        
        # 替换为实际用户名和密码。
        username: "user-xxx"
        password: "password-xxx"
        # 用户类型,根据真实用户类型填写 PLAIN 或 SCRAM-SHA-256。
        sasl.mechanism: "PLAIN"
        
        # 打开SSL配置。
        ssl.enabled: true
        # 关闭SSL的证书域名校验。
        ssl.verification_modeedit: none
      

      参数

      说明

      paths

      需要读取的本地日志文件路径。

      hosts

      Kafka 实例的接入点地址,支持配置多个不同协议的接入点。如何查看实例接入点,请参见查看接入点

      version

      使用的 Kafka 客户端版本。建议根据购买的服务端(Kafka 实例)版本进行配置,购买 2.2.2 版本实例则配置为"2.2.0";购买 2.8.2 版本实例则配置为"2.8.0"。

      topic

      用于指定写入数据的 Topic。

      required_acks

      用于指定写入数据的可靠性,默认为 -1。

      • 配置为0,表示可靠性最低,不关心数据的写入结果.
      • 配置为1,表示可靠性中等,数据写入主副本则认为写入成功。
      • 配置为-1,表示最高可靠性,数据写入当前正常运行的所有副本则认为写入成功。

      请根据业务实际场景配置写入数据的可靠性,建议使用可靠性中等及以上的配置。

      partition.round_robin

      指定数据分配方式。推荐使用round_roubin的方式,可以增加数据聚合,降低写入压力。
      除了 round_roubin 之外,还可以选择rangehash方式,默认为range

      partition.round_robin.reachable_only

      指定分区选择范围。默认值为false,数据写入时会从所有的分区中进行选择。当设置为true时仅会选择正常可用的分区写入。

      username

      通过 SASL_SSL 接入点连接实例时,需要通过 SASL 用户名及密码鉴权后才能访问 Kafka 实例。请填写 PLAIN 或 SCRAM 用户的名称。

      password

      请填写 PLAIN 或 SCRAM 用户的密码。

      sasl.mechanism

      用户类型,根据真实用户类型填写 PLAINSCRAM-SHA-256

      ssl.enabled

      使用 SASL_SSL 接入点时,必须打开 SSL 加密传输,因此固定配置为true

      ssl.verification_modeedit

      配置为none,表示关闭 SSL 证书域名校验。

  3. 启动 Filebeat,读取本地日志文件数据并写入 Kafka。

    ./filebeat -e -c ./filebeat.yaml
    

    图片

  4. 在 Kafka 客户端消费目标 Topic 中的数据。
    请选择合适的方式从目标 Topic 中消费数据,相关文档请参见使用默认接入点连接实例使用 SASL_PLAINTEXT 接入点连接实例使用 SASL_SSL 接入点连接实例
    图片