日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据。本文档介绍如何使用 Go SDK 消费组消费日志。
日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard,您无需关注消费组的内部调度细节及消费者之间的负载均衡、故障转移等,只需要专注于业务逻辑。
关于消费组消费日志数据的基本概念等背景信息,请参考通过消费组消费数据。
说明
日志服务 SDK 消费组实现了请求失败自动重试、消费进度检查点自动上报等机制。因此,您仅需要关注于如何处理每次消费得到的 LogGroupList 的业务逻辑实现即可。
Go SDK 中,log_consumer.GetDefaultConsumerConfig()
函数返回消费组的默认配置 consumerCfg,并支持配置 Endpoint、Region、AccessKeyID、AccessKeySecret 等基本信息、日志项目 ID 和日志主题 ID 列表、消费组名称和消费者名称。除此之外,您还可以通过 consumerCfg 的其他字段进行其他自定义配置。
consumerCfg 支持的字段如下所示。
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
MaxFetchLogGroupCount | Integer | 100 | 消费者单次消费日志时,最大获取 LogGroup 数量,默认为 100,最大为 1000。 |
HeartbeatIntervalInSecond | Integer | 20 | Consumer 心跳上报时间间隔,单位为秒。 |
DataFetchIntervalInMillisecond | Integer | 200 | Consumer 消费日志时间间隔,单位为毫秒。 |
FlushCheckpointIntervalSecond | Integer | 5 | Consumer 上传消费进度的时间间隔,单位为秒。 |
ConsumeFrom | String | begin | 开始消费时的默认消费位点,与 DescribeCursor 的 From 参数一致。仅在该消费者从未上传过消费位点时有效。 |
OrderedConsume | Boolean | false | 是否开启顺序消费。开启顺序消费后,消费者会根据 Shard 分裂的父子关系进行消费。 |
LoggerConfig | LoggerConfig | / | 日志相关配置,详细信息请参考下表。 |
LoggerConfig 配置说明:
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
LogLevel | String | info | 设置日志输出级别,默认值为 Info。consumer 中一共有 4 种日志输出级别,分别为 debug、info、warn 和 error。 |
LogFileName | String | 50 | 日志文件输出路径。若未设置,则默认输出到 stdout。 |
IsJsonType | Boolean | false | 是否格式化文件输出格式,默认为 false。 |
LogMaxSize | Integer | 10 | 单个日志存储数量,默认为 10MiB。 |
LogCompress | Boolean | false | 日志是否开启压缩。 |
以下代码以 Go SDK 为例,演示通过 SDK 创建消费组和消费者,并消费日志的整体流程。
package tls import ( "context" "fmt" "os" "time" "github.com/pkg/errors" log_consumer "github.com/volcengine/volc-sdk-golang/service/tls/consumer" "github.com/volcengine/volc-sdk-golang/service/tls/pb" ) func launchConsumer() error { // 获取消费组的默认配置 consumerCfg := log_consumer.GetDefaultConsumerConfig() // 推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455 // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空 consumerCfg.Endpoint = os.Getenv("VOLCENGINE_ENDPOINT") consumerCfg.Region = os.Getenv("VOLCENGINE_REGION") consumerCfg.AccessKeyID = os.Getenv("VOLCENGINE_ACCESS_KEY_ID") consumerCfg.AccessKeySecret = os.Getenv("VOLCENGINE_ACCESS_KEY_SECRET") // 请配置您的日志项目ID和日志主题ID列表 consumerCfg.ProjectID = "<YOUR-PROJECT-ID>" consumerCfg.TopicIDList = []string{"<YOUR-TOPIC-ID>"} // 请配置您的消费组名称(若您未创建过消费组,SDK将默认为您创建指定名称的消费组) consumerCfg.ConsumerGroupName = "<CONSUMER-GROUP-NAME>" // 请配置消费者名称(同一个消费组的不同消费者需要保证不同名) consumerCfg.ConsumerName = "<CONSUMER_NAME>" // 定义日志消费函数,您可根据业务需要,自行实现处理LogGroupList的日志消费函数 // 下面展示了逐个打印消费到的每条日志的每个键值对的代码实现示例 var handleLogs = func(topicID string, shardID int, l *pb.LogGroupList) { fmt.Printf("received new logs from topic: %s, shard: %d\n", topicID, shardID) for _, logGroup := range l.LogGroups { for _, log := range logGroup.Logs { for _, content := range log.Contents { fmt.Printf("%s: %s\n", content.Key, content.Value) } } } } // 创建消费者 consumer, err := log_consumer.NewConsumer(context.TODO(), consumerCfg, handleLogs) if err != nil { return errors.Wrap(err, "get new consumer failed: ") } // 启动消费者消费 if err := consumer.Start(); err != nil { return errors.Wrap(err, "start consumer failed: ") } // 等待消费 <-time.After(time.Second * 60) // 停止消费 consumer.Stop() return nil } func main() { if err := launchConsumer(); err != nil { fmt.Println(err.Error()) } }