日志服务提供消费日志的 API 接口 ConsumeLogs,支持实时消费采集到服务端的日志数据。本文档介绍如何通过日志服务 SDK 调用 ConsumeLogs 接口,实现日志全量数据的顺序读写。
日志服务可作为日志数据的传输中转,提供类似 Kafka 的日志数据中转功能。通过日志服务的消费日志接口和多语言 SDK,您可以将 Go 等语言的应用作为消费者实时消费日志服务数据。关于消费日志的服务端接口,请查看 ConsumeLogs,关于日志服务提供的多语言 SDK,请参考 SDK 文档。
说明
日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard。详细说明请参考 通过消费组消费数据。
参考以下示例代码,创建一个 ConsumeLogs.go
文件,调用接口 ConsumeLogs
读取日志数据,并实时消费日志数据。
import ( "fmt" "time" . "github.com/volcengine/volc-sdk-golang/service/tls" "github.com/volcengine/volc-sdk-golang/service/tls/pb" ) func consumeLogs(topicID string, shardID int) { client := NewClient(os.Getenv("LOG_TEST_ENDPOINT"), os.Getenv("LOG_TEST_ACCESS_KEY_ID"), //LOG_TEST_SECURITY_TOKEN为通过IAM的STS机制获取的临时安全令牌,使用临时Token时也应传入临时的AK、SK。详细说明请参考https://www.volcengine.com/docs/6470/160162。不使用临时Token时LOG_TEST_SECURITY_TOKEN传空即可。 os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), os.Getenv("LOG_TEST_SECURITY_TOKEN"), os.Getenv("LOG_TEST_REGION")) // 获取最初的日志游标 beginCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{ TopicID: topicID, ShardID: shardID, From: "begin", }) if err != nil { panic(err.Error()) } // 获取最后的日志游标 endCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{ TopicID: topicID, ShardID: shardID, From: "end", }) if err != nil { panic(err.Error()) } // 获取某个时间点的日志游标 timestampCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{ TopicID: topicID, ShardID: shardID, From: "1661418000", }) if err != nil { panic(err.Error()) } var logGroupCount = 10 // 消费十组日志 consumedLogs, err := client.ConsumeLogs(&ConsumeLogsRequest{ TopicID: topicID, ShardID: shardID, Cursor: timestampCursorResp.Cursor, LogGroupCount: &logGroupCount, }) if err != nil { panic(err.Error()) } fmt.Println(consumedLogs) // 消费十组日志,且指定压缩格式为lz4(目前支持lz4) var compressionType = "lz4" consumedLogs, err = client.ConsumeLogs(&ConsumeLogsRequest{ TopicID: topicID, ShardID: shardID, Cursor: timestampCursorResp.Cursor, LogGroupCount: &logGroupCount, Compression: &compressionType, }) if err != nil { panic(err.Error()) } // 指定起始、结尾消费日志 consumedLogs, err = client.ConsumeLogs(&ConsumeLogsRequest{ TopicID: topicID, ShardID: shardID, Cursor: beginCursorResp.Cursor, EndCursor: &endCursorResp.Cursor, LogGroupCount: &logGroupCount, Compression: &compressionType, }) if err != nil { panic(err.Error()) } }