日志服务支持通过 Kafka 协议上传日志数据到服务端,即可以使用 Kafka Producer SDK 来采集日志数据,并通过 Kafka 协议上传到日志服务。本文介绍通过 Kafka 协议将日志上传到日志服务的操作步骤。
Kafka 作为高吞吐量的消息中间件,在多种自建场景的日志采集方案中被用于消息管道。例如在日志源服务器中的开源采集工具采集日志,或通过 Producer 直接写入日志数据,再通过消费管道供下游应用进行消费。日志服务支持通过 Kafka 协议上传和消费日志数据,基于 Kafka 数据管道提供完整的数据上下行服务。
使用 Kafka 协议上传日志功能,无需手动开启功能,无需在数据源侧安装数据采集工具,基于简单的配置即可实现 Kafka Producer 采集并上传日志信息到日志服务。日志服务提供基于 Java 和 Go 语言的示例项目供您参考,详细信息请参考示例。
通过 Kafka 协议采集日志时,对于合法的 JSON 格式日志,日志服务会正常解析为 Key-Value 对;对于不合法的 JSON 格式,部分字段可能出现会解析错乱的情况;对于其他格式的日志数据,原始日志全文会以字符串格式被统一封装在字段 __content__
中。
说明
通过 Kafka 协议解析 JSON 格式日志时,最多支持一层扩展,包含多层嵌套的日志字段将被作为一个字符串进行采集和保存。
PutLogs
的权限。详细信息请参考可授权的操作。使用 Kafka 协议上传日志时,您需要配置以下参数。
参数 | 示例 | 说明 |
---|---|---|
连接类型 |
| 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥。 |
username |
| Kafka SASL 用户名。应配置为日志服务的日志项目 ID。 |
password |
| Kafka SASL 用户密码。应配置为火山引擎账户密钥。
说明 建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作。 |
hosts |
| 初始连接的集群地址,格式为
说明 hosts 中的服务地址部分无需指定 |
topic |
| 配置为日志服务的日志主题 ID。 |
日志服务支持自动生成 Kafka 协议上传日志数据的相关参数配置。根据以下操作步骤填写上传日志的各项基础配置和数据源信息后,控制台页面将自动生成对应的参数配置列表供您参考。
登录日志服务控制台。
在顶部导航栏中,选择日志服务所在的地域。
在左侧导航栏中,选择常用功能 > 日志接入。
在数据导入页签中,单击Kafka协议写入。
填写基础信息配置,并单击下一步。
配置 | 说明 |
---|---|
日志项目 | 用于存储 Kafka 数据的日志项目。 |
日志主题 | 用于存储 Kafka 数据的的日志主题。 |
填写数据源配置,并单击下一步。
配置 | 说明 |
---|---|
密钥 | 火山引擎账户密钥,包括 AccessKey ID 和 AccessKey Secret。您可以参考页面提示获取密钥。 说明 建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作。 |
网络连接方式 | 选择通过内网或公网连接日志服务并上传日志。若选择内网,请确认数据源可通过火山引擎内网正常访问日志服务。 |
日志压缩方式 | 原始日志的压缩方式。目前支持的压缩方式包括 gzip、snappy 和 lz4。 |
Kafka生产端 | 日志数据源类型。目前支持 Kafka 开源 SDK 或 Logstash 通过 Kafka 协议上传日志。 |
结果预览 | 日志服务根据以上参数配置自动生成对应的示例供您参考。
|
结果预览示例如下:
设置索引,并单击提交。
设置索引后,采集到服务端的日志数据才能被检索分析。设置索引的详细说明请参考配置索引。
Logstash 内置 Kafka 输出插件(logstash_output_kafka),可以通过 Kafka 协议采集数据。在通过 ELK 搭建日志采集分析系统的场景下,只需修改 Logstash 配置文件,实现通过 Kafka 协议上传日志数据到日志服务。
通过日志服务控制台自动生成 Logstash 的 Kafka 插件配置示例后,可以通过该示例测试插件连通性。其中,${hosts}
等参数说明请参考配置步骤中的结果预览。
说明
通过简单的参数配置,即可使用各类 Kafka Producer SDK 采集日志数据,并通过 Kafka 协议上传到日志服务。通过 Kafka Java SDK 上传日志的相关依赖及示例代码如下:
添加依赖。
在 pom 文件中添加 kafka-clients 的相关依赖。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.2</version> </dependency>
上传日志。
参考以下示例代码通过 Kafka Java SDK 上传日志。
说明
执行以下示例代码之前请参考配置步骤中的结果预览正确填写 userName
等参数配置。
package org.kafka; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.config.SaslConfigs; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class ProducerBatchDemo { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { Properties props = new Properties(); String userName = "${projectId}"; //火山引擎账号的密钥,或具备对应权限的子账号密钥。不支持STS临时安全令牌。 String passWord = "${AK}#${SK}"; props.put("bootstrap.servers", "${hosts}"); props.put("acks", "1"); // NoResponse 0;WaitForLocal 1;WaitForAll -1 props.put("retries", 5); props.put("linger.ms", 100); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 配置压缩方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule " + "required username=\"" + userName + "\" password=\"" + passWord + "\";"); // 1.创建一个生产者对象 Producer<String, String> producer = new KafkaProducer<String, String>(props); // 2.调用send方法 for (int i = 0; i < 1000; i++) { // java sdk不区分发送一条消息还是批量发送消息,通过配置文件做区分,主要是通过batch.size, linger.ms, buffer.memory Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicId}", "testMessageValue-" + i)); RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS); // 默认返回offset为-1 System.out.println("partition=" + recordMetadata.partition() + "offset = " + recordMetadata.offset()); } // 3.关闭生产者 System.out.println("Kafka Producer 是异步发送数据,建议等待 10s 后再关闭进程,以确保所有的数据都成功发送,否则会导致上传的数据不完整。"); Thread.sleep(10000); producer.close(); } }
通过简单的参数配置,即可使用各类 Kafka Producer SDK 采集日志数据,并通过 Kafka 协议上传到日志服务。通过 Kafka Go SDK 上传日志的相关依赖和示例代码如下:
执行以下命令安装 Sarama。
go get github.com/Shopify/sarama@v1.38.1
导入 Sarama 等必要的依赖包,并上传日志。
参考以下示例代码通过 Kafka Go SDK 上传日志。
说明
执行以下示例代码之前请参考配置步骤中的结果预览正确填写 User
等参数配置。
package kafka_produce_example import ( "fmt" "time" "github.com/Shopify/sarama" ) func ExampleProduce() { config := sarama.NewConfig() config.Version = sarama.V2_0_0_0 // specify appropriate version config.ApiVersionsRequest = true config.Net.SASL.Mechanism = "PLAIN" config.Producer.Return.Successes = true config.Net.SASL.Version = int16(0) config.Net.SASL.Enable = true // projectId config.Net.SASL.User = "${projectId}" // 火山引擎账号的密钥,或具备对应权限的子账号密钥。不支持STS临时安全令牌。 config.Net.SASL.Password = "${access-key-id}#${access-key-secret}" config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Compression = sarama.CompressionLZ4 config.Net.TLS.Enable = true // hosts为生产的地址,具体见文档 asyncProducer, err := sarama.NewAsyncProducer([]string{"${hosts}"}, config) if err != nil { fmt.Println("new producer error:" + err.Error()) panic(err) } msg := &sarama.ProducerMessage{ // ${topicID}为tls的日志主题ID,例如"0fdaa6b6-3c9f-424c-8664-fc0d222c****"。 Topic: "${topicID}", Value: sarama.StringEncoder("test"), } asyncProducer.Input() <- msg go func() { for { select { case msg := <-asyncProducer.Successes(): if msg != nil { // offset 默认返回为-1 fmt.Printf("send response; partition:%d, offset:%d\n", msg.Partition, msg.Offset) } case msg := <-asyncProducer.Errors(): if msg != nil { fmt.Println("producer send error:" + msg.Error()) } } } }() time.Sleep(time.Minute) _ = asyncProducer.Close() }
使用 Kafka 协议上传日志失败时,会按照 Kafka 的错误码返回对应的错误信息,请参考 Kafka error list获取更多信息。
除此之外,日志服务还在 Java 语言的 Kafka 错误码 SASLAuthenticationException
中封装了鉴权、配置相关参数的错误信息,详细说明如下:
错误信息 | 说明 |
---|---|
invalid SASL/PLAIN request: expected 3 tokens | 未配置 user 或者 password。请参考配置步骤中的结果预览正确填写配置。 |
invalid SASL/PLAIN request: empty projectId | 未配置 user 字段。请参考配置步骤中的结果预览正确填写配置。 |
invalid SASL/PLAIN request: Password format wrong | password 字段配置错误。请参考配置步骤中的结果预览正确填写配置。 |
invalid SASL/PLAIN request: empty AccessKey | 未配置 AK。请参考配置步骤中的结果预览正确填写配置。 |
invalid SASL/PLAIN request: empty SecretKey | 未配置 SK。请参考配置步骤中的结果预览正确填写配置。 |
invalid SASL/PLAIN request: Invalid projectId | 指定的 Project ID 不存在。请检查 Project ID 是否输入正确。 |
Access Denied. You are not authorized to perform this operation. | 当前用户不具备操作权限。建议检查用户权限。 |