You need to enable JavaScript to run this app.
导航
使用 Kafka 协议上传日志
最近更新时间:2025.01.08 17:11:15首次发布时间:2022.10.14 11:41:16

日志服务支持通过 Kafka 协议上传日志数据到服务端,即可以使用 Kafka Producer SDK 来采集日志数据,并通过 Kafka 协议上传到日志服务。本文介绍通过 Kafka 协议将日志上传到日志服务的操作步骤。

背景信息

Kafka 作为高吞吐量的消息中间件,在多种自建场景的日志采集方案中被用于消息管道。例如在日志源服务器中的开源采集工具采集日志,或通过 Producer 直接写入日志数据,再通过消费管道供下游应用进行消费。日志服务支持通过 Kafka 协议上传和消费日志数据,基于 Kafka 数据管道提供完整的数据上下行服务。
使用 Kafka 协议上传日志功能,无需手动开启功能,无需在数据源侧安装数据采集工具,基于简单的配置即可实现 Kafka Producer 采集并上传日志信息到日志服务。日志服务提供基于 Java 和 Go 语言的示例项目供您参考,详细信息请参考示例
通过 Kafka 协议采集日志时,对于合法的 JSON 格式日志,日志服务会正常解析为 Key-Value 对;对于不合法的 JSON 格式,部分字段可能出现会解析错乱的情况;对于其他格式的日志数据,原始日志全文会以字符串格式被统一封装在字段 __content__ 中。

说明

通过 Kafka 协议解析 JSON 格式日志时,最多支持一层扩展,包含多层嵌套的日志字段将被作为一个字符串进行采集和保存。

限制说明

  • 支持的 Kafka 协议版本为 0.11.x~2.0.x。
  • 支持压缩方式包括 gzip、snappy 和 lz4。
  • 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥,详细信息请参考示例
  • 如果日志主题中有多个 Shard,日志服务不保证数据的有序性,建议使用负载均衡模式上传日志。
  • 当使用 Kafka Producer Batch 打包发送数据的时候,一次 Batch 数据的大小不能超过 5MiB,一条消息的大小上限是 5MiB,一个 Batch 请求中消息条数不能超过 10000 条,服务端会对每次 Producer 请求写入的日志数据进行长度检查,如果超出限制则整个请求失败且无任何日志数据成功写入。

前提条件

  • 已开通日志服务,创建日志项目与日志主题,并成功采集到日志数据。详细说明请参考快速入门
  • 确保当前操作账号拥有开通 Kafka 协议上传日志的权限,即具备 Action PutLogs 的权限。详细信息请参考可授权的操作

参数说明

使用 Kafka 协议上传日志时,您需要配置以下参数。

参数

示例

说明

连接类型

SASL_SSL

为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥。

username

c8f20efe-405f-4d57-98cf-8c58d890****

Kafka SASL 用户名。应配置为日志服务的日志项目 ID。

password

AKLTYmQzOWUzMWx*******#WVRnM05UWTRaVGhrTUdFNE5EazNZV0kyTjJRME********

Kafka SASL 用户密码。应配置为火山引擎账户密钥。
格式为 ${access-key-id}#${access-key-secret},其中:

  • ${access-key-id} 应替换为您的 AccessKey ID。
  • ${access-key-secret} 应替换为您的 AccessKey Secret。

说明

建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作

hosts

  • 公网:tls-cn-beijing.volces.com:9094
  • 私网:tls-cn-beijing.ivolces.com:9094

初始连接的集群地址,格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9094,其中:

  • 服务地址为当前地域下日志服务的服务入口。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址
  • 端口号固定为 9094。

说明

hosts 中的服务地址部分无需指定 https://

topic

20a50a35-304a-4c01-88d2-23349c30****

配置为日志服务的日志主题 ID。

为了获得更好的性能,建议添加以下 Kafka producer 参数。

参数

推荐值

说明

batch.size

  • 设置了压缩:262144
  • 未设置压缩:2621440

用于控制生产者发送单个分区消息时的批处理大小。单位:字节。

说明

批次中的消息总大小达到 batch.size 或者生产者发送消息的等待时间达到 linger.ms,生产者都会立即发送当前批次消息。

max.request.size

  • 设置了压缩:1046576
  • 未设置压缩:10465760

用于指定单个请求(一个请求中可能有多个分区)的最大大小。单位:字节。

linger.ms

100

用于指定生产者在发送当前批次消息前的最长等待时间,单位:毫秒。如果不配置,表示立即发送。

compression.type

lz4

用于指定消息在发送到 Kafka 之前的压缩算法。

配置步骤

日志服务支持自动生成 Kafka 协议上传日志数据的相关参数配置。根据以下操作步骤填写上传日志的各项基础配置和数据源信息后,控制台页面将自动生成对应的参数配置列表供您参考。

  1. 登录日志服务控制台

  2. 在顶部导航栏中,选择日志服务所在的地域。

  3. 在左侧导航栏中,选择常用功能 > 日志接入

  4. 数据导入页签中,单击Kafka协议写入

  5. 填写基础信息配置,并单击下一步

    配置

    说明

    日志项目

    用于存储 Kafka 数据的日志项目。

    日志主题

    用于存储 Kafka 数据的的日志主题。

  6. 填写数据源配置,并单击下一步

    配置

    说明

    密钥

    火山引擎账户密钥,包括 AccessKey ID 和 AccessKey Secret。您可以参考页面提示获取密钥。

    说明

    建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作

    网络连接方式

    选择通过内网公网连接日志服务并上传日志。若选择内网,请确认数据源可通过火山引擎内网正常访问日志服务。

    日志压缩方式

    原始日志的压缩方式。目前支持的压缩方式包括 gzip、snappy 和 lz4。

    Kafka生产端

    日志数据源类型。目前支持 Kafka 开源 SDK 或 Logstash 通过 Kafka 协议上传日志。

    结果预览

    日志服务根据以上参数配置自动生成对应的示例供您参考。

    • Kafka 开源 SDK:直接复制各个配置项的取值,并将其填写在 Kafka 开源 SDK 的对应参数中。完整的代码示例请参考示例代码
    • Logstash:日志服务自动生成 Logstash 的 Kafka 插件配置,测试插件连通性。详细说明请参考通过 Logstash 上传日志

    结果预览示例如下:
    Image

    Image

  7. 设置索引,并单击提交
    设置索引后,采集到服务端的日志数据才能被检索分析。设置索引的详细说明请参考配置索引

    说明

    完成任务配置后,任务列表中不会展示已配置的任务,您可以打开目标日志主题查看数据。

示例

通过 Logstash 上传日志

Logstash 内置 Kafka 输出插件(logstash_output_kafka),可以通过 Kafka 协议采集数据。在通过 ELK 搭建日志采集分析系统的场景下,只需修改 Logstash 配置文件,实现通过 Kafka 协议上传日志数据到日志服务。
通过日志服务控制台自动生成 Logstash 的 Kafka 插件配置示例后,可以通过该示例测试插件连通性。其中,${hosts} 等参数说明请参考配置步骤中的结果预览

说明

  • 日志服务使用 SASL_SSL 连接协议,因此 Kakfa 需要通过 SASL_SSL 接入点实现消息的收发机制。
  • 支持的 Logstash 版本为 7.12~8.8.1。如果您需要使用其他 Logstash 版本,可以通过工单系统联系技术支持沟通业务需求。
  • 建议在测试阶段通过以下配置测试插件连通性,生产环境中需要删除其中 stdout 相关的输出配置。
  • 对于 JSON 格式的日志,建议在配置中将日志输出格式设置为 JSON(codec => json),否则会导致上传的数据不完整。日志服务会自动解析并结构化 JSON 格式日志数据。

通过 Kafka Java SDK 上传日志

通过简单的参数配置,即可使用各类 Kafka Producer SDK 采集日志数据,并通过 Kafka 协议上传到日志服务。通过 Kafka Java SDK 上传日志的相关依赖及示例代码如下:

  1. 添加依赖。
    在 pom 文件中添加 kafka-clients 的相关依赖。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
    </dependency>
    
  2. 上传日志。
    参考以下示例代码通过 Kafka Java SDK 上传日志。

    说明

    执行以下示例代码之前请参考配置步骤中的结果预览正确填写 userName 等参数配置。

    package bytedance;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.config.SaslConfigs;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    //    pom.xml依赖
    //    <dependency>
    //      <groupId>org.apache.kafka</groupId>
    //      <artifactId>kafka-clients</artifactId>
    //      <version>2.2.2</version> <!-- 请根据需要选择合适的版本 -->
    //    </dependency>
    
    public class KafkaProducer {
        public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
            Properties props = new Properties();
    
            // TLS 连接以及鉴权信息配置。
            String userName = "${projectId}";
            // 火山引擎账号的密钥,或具备对应权限的 IAM 账号密钥。不支持STS临时安全令牌。
            String passWord = "${AK}#${SK}";
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "${hosts}");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG,
                    "org.apache.kafka.common.security.plain.PlainLoginModule " +
                            "required username=\"" + userName + "\" password=\"" + passWord + "\";");
    
            // producer 参数配置。
            props.put(ProducerConfig.ACKS_CONFIG, "1"); // NoResponse 0;WaitForLocal 1;WaitForAll -1
            props.put(CommonClientConfigs.RETRIES_CONFIG, 5);
            props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 配置压缩方式。
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, "262144");
            props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1046576");
            // 1.创建一个生产者对象。
            Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
            // 2.调用 send 方法。
            Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicId}", "testMessageValue"));
            RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
            // 默认返回的 offset 为 -1。
            System.out.println("partition=" + recordMetadata.partition() + "offset = " + recordMetadata.offset());
            // 3.关闭生产者。
            System.out.println("Kafka Producer 是异步发送数据,建议等待 10s 后再关闭进程,以确保所有的数据都成功发送,否则会导致上传的数据不完整。");
            Thread.sleep(10000);
            // 在生产环境中,producer 应该常驻在进程中。只有在进程退出时,才调用 close 方法关闭 producer。
            producer.close();
        }
    }
    

通过 Kafka Go SDK 上传日志

通过简单的参数配置,即可使用各类 Kafka Producer SDK 采集日志数据,并通过 Kafka 协议上传到日志服务。通过 Kafka Go SDK 上传日志的相关依赖和示例代码如下:

  1. 执行以下命令安装 Sarama。

    go get github.com/Shopify/sarama@v1.38.1
    
  2. 导入 Sarama 等必要的依赖包,并上传日志。
    参考以下示例代码通过 Kafka Go SDK 上传日志。

    说明

    执行以下示例代码之前请参考配置步骤中的结果预览正确填写 User 等参数配置。

    package kafka_produce_example
    
    import (
        "fmt"
        "time"
    
        "github.com/Shopify/sarama"
    )
    
    func ExampleProduce() {
            config := sarama.NewConfig()
            config.Version = sarama.V2_0_0_0 // 指定合适的版本。
            config.ApiVersionsRequest = true
            config.Net.SASL.Mechanism = "PLAIN"
            config.Producer.Return.Successes = true
            config.Net.SASL.Version = int16(0)
            config.Net.SASL.Enable = true
            // 日志项目 ID。
            config.Net.SASL.User = "${projectId}"
            // 火山引擎账号的密钥,或具备对应权限的 IAM 账号密钥。不支持 STS 临时安全令牌。
            config.Net.SASL.Password = "${access-key-id}#${access-key-secret}"
            config.Producer.RequiredAcks = sarama.WaitForLocal
            config.Producer.Compression = sarama.CompressionLZ4
            config.Producer.Flush.Bytes = 262144
            config.Producer.Flush.Frequency = 100 * time.Millisecond
    
            config.Net.TLS.Enable = true
    
            // hosts 为生产的地址。具体说明,请参考本文中的参数说明。
            asyncProducer, err := sarama.NewAsyncProducer([]string{"${hosts}"}, config)
            if err != nil {
                    fmt.Println("new producer error:" + err.Error())
                    panic(err)
            }
    
            msg := &sarama.ProducerMessage{
                    // ${topicID} 为日志服务的日志主题 ID,例如 0fdaa6b6-3c9f-424c-8664-fc0d222c****。
                    Topic: "${topicID}",
                    Value: sarama.StringEncoder("test"),
            }
            asyncProducer.Input() <- msg
            go func() {
                    for {
                            select {
                            case msg := <-asyncProducer.Successes():
                                    if msg != nil {
                                            // offset 默认返回为 -1。
                                            fmt.Printf("send response; partition:%d, offset:%d\n", msg.Partition, msg.Offset)
                                    }
                            case msg := <-asyncProducer.Errors():
                                    if msg != nil {
                                            fmt.Println("producer send error:" + msg.Error())
                                    }
                            }
                    }
            }()
    
            time.Sleep(time.Minute)
            _ = asyncProducer.Close()
    }
    

错误信息

使用 Kafka 协议上传日志失败时,会按照 Kafka 的错误码返回对应的错误信息,请参考 Kafka error list获取更多信息。
除此之外,日志服务还在 Java 语言的 Kafka 错误码 SASLAuthenticationException 中封装了鉴权、配置相关参数的错误信息,详细说明如下:

错误信息

说明

invalid SASL/PLAIN request: expected 3 tokens

未配置 user 或者 password。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty projectId

未配置 user 字段。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: Password format wrong

password 字段配置错误。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty AccessKey

未配置 AK。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty SecretKey

未配置 SK。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: Invalid projectId

指定的 Project ID 不存在。请检查 Project ID 是否输入正确。

Access Denied. You are not authorized to perform this operation.

当前用户不具备操作权限。建议检查用户权限。