日志服务支持通过 SDK 消费采集到服务端的日志数据。本文档通过示例代码演示如何通过 Go SDK 消费日志。
已安装日志服务 Go SDK。更多信息,请参见安装 Go SDK。
已执行以下命令安装 proto 依赖包。
go get -u github.com/gogo/protobuf/proto
已添加 VOLCENGINE_ACCESS_KEY_ID 等环境变量。环境变量的配置方式请参考配置身份认证信息。
注意
推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。
本文档通过示例代码演示如何通过 SDK 消费日志数据。Go SDK 支持通过以下方式写入日志:
写入方式 | 说明 |
---|---|
Consumer | 推荐。 |
ConsumeLogs | 不推荐。 |
通过 Go Consumer 消费日志数据的示例代码如下。
package tls import ( "context" "fmt" "os" "time" "github.com/pkg/errors" log_consumer "github.com/volcengine/volc-sdk-golang/service/tls/consumer" "github.com/volcengine/volc-sdk-golang/service/tls/pb" ) func launchConsumer() error { // 获取消费组的默认配置 consumerCfg := log_consumer.GetDefaultConsumerConfig() // 请配置您的Endpoint、Region、AccessKeyID、AccessKeySecret等基本信息 consumerCfg.Endpoint = os.Getenv("VOLCENGINE_ENDPOINT") consumerCfg.Region = os.Getenv("VOLCENGINE_REGION") consumerCfg.AccessKeyID = os.Getenv("VOLCENGINE_ACCESS_KEY_ID") consumerCfg.AccessKeySecret = os.Getenv("VOLCENGINE_ACCESS_KEY_SECRET") // 请配置您的日志项目ID和日志主题ID列表 consumerCfg.ProjectID = "<YOUR-PROJECT-ID>" consumerCfg.TopicIDList = []string{"<YOUR-TOPIC-ID>"} // 请配置您的消费组名称(若您未创建过消费组,SDK将默认为您创建指定名称的消费组) consumerCfg.ConsumerGroupName = "<CONSUMER-GROUP-NAME>" // 请配置消费者名称(同一个消费组的不同消费者需要保证不同名) consumerCfg.ConsumerName = "<CONSUMER_NAME>" // 定义日志消费函数,您可根据业务需要,自行实现处理LogGroupList的日志消费函数 // 下面展示了逐个打印消费到的每条日志的每个键值对的代码实现示例 var handleLogs = func(topicID string, shardID int, l *pb.LogGroupList) { fmt.Printf("received new logs from topic: %s, shard: %d\n", topicID, shardID) for _, logGroup := range l.LogGroups { for _, log := range logGroup.Logs { for _, content := range log.Contents { fmt.Printf("%s: %s\n", content.Key, content.Value) } } } } // 创建消费者 consumer, err := log_consumer.NewConsumer(context.TODO(), consumerCfg, handleLogs) if err != nil { return errors.Wrap(err, "get new consumer failed: ") } // 启动消费者消费 if err := consumer.Start(); err != nil { return errors.Wrap(err, "start consumer failed: ") } // 等待消费 <-time.After(time.Second * 60) // 停止消费 consumer.Stop() return nil } func main() { if err := launchConsumer(); err != nil { fmt.Println(err.Error()) } }
通过调用 ConsumeLogs 同步接口消费日志数据的示例代码如下。
package tls import ( "fmt" "github.com/volcengine/volc-sdk-golang/service/tls" "github.com/volcengine/volc-sdk-golang/service/tls/pb" ) func main() { // 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455 // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空 client := tls.NewClient(os.Getenv("VOLCENGINE_ENDPOINT"), os.Getenv("VOLCENGINE_ACCESS_KEY_ID"), os.Getenv("VOLCENGINE_ACCESS_KEY_SECRET"), os.Getenv("VOLCENGINE_TOKEN"), os.Getenv("VOLCENGINE_REGION")) // 请填写您希望消费日志的TopicID和shardID topicID := "TopicId" shardID := 0 // 获取日志消费的起始游标 // DescribeCursor API的请求参数规范和限制请参阅https://www.volcengine.com/docs/6470/112193 describeCursorResp, err := client.DescribeCursor(&tls.DescribeCursorRequest{ TopicID: topicID, ShardID: shardID, From: "begin", }) if err != nil { // 处理错误 fmt.Println(err.Error()) } beginCursor := describeCursorResp.Cursor // 消费日志数据 // 请根据您的需要,填写TopicId、ShardId、Cursor、LogGroupCount、Compression等参数,推荐您使用lz4压缩 // 您可再次调用DescribeCursor接口获取日志消费的结束游标,作为ConsumeLogs接口的EndCursor参数值 // ConsumeLogs API的请求参数规范和限制请参阅https://www.volcengine.com/docs/6470/112194 logGroupCount := 1000 compression := "lz4" resp, err := client.ConsumeLogs(&tls.ConsumeLogsRequest{ TopicID: topicID, ShardID: shardID, Cursor: beginCursor, LogGroupCount: &logGroupCount, Compression: &compression, }) if err != nil { // 处理错误 fmt.Println(err.Error()) } handleLogs(topicID, shardID, resp.Logs) } // 定义日志消费函数,您可根据业务需要,自行实现处理LogGroupList的日志消费函数 // 下面展示了逐个打印消费到的每条日志的每个键值对的代码实现示例 func handleLogs(topicID string, shardID int, l *pb.LogGroupList) { fmt.Printf("received new logs from topic: %s, shard: %d\n", topicID, shardID) for _, logGroup := range l.LogGroups { for _, log := range logGroup.Logs { for _, content := range log.Contents { fmt.Printf("%s: %s\n", content.Key, content.Value) } } } }