日志服务支持通过消费组消费数据,并提供一系列 SDK 供您管理消费组。本文档介绍通过日志服务消费组消费数据的相关操作。
日志服务提供消费日志的 OpenAPI 接口 ConsumeLogs,支持实时消费采集到服务端的日志数据。在使用 ConsumeLogs 接口时,需要按照日志分区维度消费日志数据,消费时自行指定日志主题 ID、Shard ID 和起始结束游标(Cursor),所以消费日志的进度受限于单个 Shard 的读写能力,还需要自行维护消费进度,在 Shard 自动分裂的场景下消费逻辑与流程繁琐。
日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard,您无需关注消费组的内部调度细节及消费者之间的负载均衡、故障转移等,只需要专注于业务逻辑。
概念 | 说明 |
---|---|
消费组(ConsumerGroup) | 多个消费者组成的虚拟集合。以消费组维度消费日志数据时,消费组中的所有消费者订阅同一个日志主题,共同消费一个日志主题中的数据。每个消费者消费日志主题中一个或多个 Shard 的数据,各个消费者间不会重复消费数据。 |
消费者(Consumer) | 一个从日志服务中消费数据的客户端,是消费组的组成部分。
|
消费位点(Checkpoint) | 一个 Shard 在被一个消费者消费的过程中,会随时记录当前 Shard 的消费位点(即游标进度)并上报服务端,以此来作为程序重启时的起始消费游标,从而保证数据不会被重复消费。 |
日志服务提供了 Consumer 异步日志消费库,支持消费同一个日志项目下多个日志主题,具有异步消费、高性能、失败重试、优雅关闭等特性。您可以通过日志服务 Go SDK、Java SDK 或 Python SDK 管理消费组,并消费日志数据。
以下代码以 Go SDK 为例,演示通过 SDK 创建消费组和消费者,并消费日志的整体流程。
package tls import ( "context" "fmt" "os" "time" "github.com/pkg/errors" log_consumer "github.com/volcengine/volc-sdk-golang/service/tls/consumer" "github.com/volcengine/volc-sdk-golang/service/tls/pb" ) func launchConsumer() error { // 获取消费组的默认配置 consumerCfg := log_consumer.GetDefaultConsumerConfig() // 请配置您的Endpoint、Region、AccessKeyID、AccessKeySecret等基本信息 consumerCfg.Endpoint = os.Getenv("LOG_SERVICE_ENDPOINT") consumerCfg.Region = os.Getenv("LOG_SERVICE_REGION") consumerCfg.AccessKeyID = os.Getenv("LOG_SERVICE_AK") consumerCfg.AccessKeySecret = os.Getenv("LOG_SERVICE_SK") // 请配置您的日志项目ID和日志主题ID列表 consumerCfg.ProjectID = "<YOUR-PROJECT-ID>" consumerCfg.TopicIDList = []string{"<YOUR-TOPIC-ID>"} // 请配置您的消费组名称(若您未创建过消费组,SDK将默认为您创建指定名称的消费组) consumerCfg.ConsumerGroupName = "<CONSUMER-GROUP-NAME>" // 请配置消费者名称(同一个消费组的不同消费者需要保证不同名) consumerCfg.ConsumerName = "<CONSUMER_NAME>" // 定义日志消费函数,您可根据业务需要,自行实现处理LogGroupList的日志消费函数 // 下面展示了逐个打印消费到的每条日志的每个键值对的代码实现示例 var handleLogs = func(topicID string, shardID int, l *pb.LogGroupList) { fmt.Printf("received new logs from topic: %s, shard: %d\n", topicID, shardID) for _, logGroup := range l.LogGroups { for _, log := range logGroup.Logs { for _, content := range log.Contents { fmt.Printf("%s: %s\n", content.Key, content.Value) } } } } // 创建消费者 consumer, err := log_consumer.NewConsumer(context.TODO(), consumerCfg, handleLogs) if err != nil { return errors.Wrap(err, "get new consumer failed: ") } // 启动消费者消费 if err := consumer.Start(); err != nil { return errors.Wrap(err, "start consumer failed: ") } // 等待消费 <-time.After(time.Second * 60) // 停止消费 consumer.Stop() return nil } func main() { if err := launchConsumer(); err != nil { fmt.Println(err.Error()) } }
日志服务 SDK 支持通过参数配置消费组消费的各种细节配置,例如是否开启顺序消费、Consumer 心跳上报时间间隔等,您可以通过这些配置管理消费组的各种消费逻辑。
例如,在上述 Go SDK 示例代码中,log_consumer.GetDefaultConsumerConfig()
函数返回了消费组的默认配置 consumerCfg,并向您展示了如何配置您的 Endpoint、Region、AccessKeyID、AccessKeySecret 等基本信息、日志项目 ID 和日志主题 ID 列表、消费组名称和消费者名称。除此之外,您还可通过 consumerCfg 的其他字段进行其他自定义配置。consumerCfg 支持的字段如下所示。
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
MaxFetchLogGroupCount | Integer | 100 | 消费者单次消费日志时,最大获取 LogGroup 数量,默认为 100,最大为 1000。 |
HeartbeatIntervalInSecond | Integer | 20 | Consumer 心跳上报时间间隔,单位为秒。 |
DataFetchIntervalInMillisecond | Integer | 200 | Consumer 消费日志时间间隔,单位为毫秒。 |
FlushCheckpointIntervalSecond | Integer | 5 | Consumer 上传消费进度的时间间隔,单位为秒。 |
ConsumeFrom | String | begin | 开始消费时的默认消费位点,与 DescribeCursor 的 From 参数一致。仅在该消费者从未上传过消费位点时有效。 |
OrderedConsume | Boolean | FALSE | 是否开启顺序消费。开启顺序消费后,消费者会根据 Shard 分裂的父子关系进行消费。 |
LoggerConfig | LoggerConfig | / | 日志相关配置,详细信息请参考下表。 |
LoggerConfig 配置说明:
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
LogLevel | String | info | 设置日志输出级别,默认值为 info。consumer 中一共有 4 种日志输出级别,分别为 debug、info、warn 和 error。 |
LogFileName | String | 50 | 日志文件输出路径。若未设置,则默认输出到 stdout。 |
IsJsonType | Boolean | true | 是否格式化文件输出格式,默认为 false。 |
LogMaxSize | Integer | 10 | 单个日志存储数量,默认为 10MiB。 |
LogCompress | Boolean | true | 日志是否开启压缩,默认为 false。 |
日志服务 SDK 消费组实现了请求失败自动重试、消费进度消费位点自动上报等机制。因此,您仅需要关注于如何处理每次消费得到的 LogGroupList 的业务逻辑实现即可。
在使用消费者消费日志数据的过程中,您可以随时查看当前的消费进度。在离线数据处理等场景下,如果需要消费过去某个时段的日志数据,或从指定位置消费数据,可以通过重置消费位点的方式为消费组指定新的 checkpoint,表示从指定位置开始消费数据。
日志服务控制台支持查看消费进度,重置消费组在订阅的 Topic 上的所有消费位点,您也可以通过 API 重置指定分区的消费位点。
说明
重置消费位点之前,应关闭对应的消费进程,如果重置消费组的所有消费位点,则关闭所有消费进程,否则消费位点重置失败。
在控制台相关的操作步骤如下:
在左侧消费组列表中单击消费组名称。
在页面右上角单击重置消费位点。
选择重置位置。
重置位置 | 说明 |
---|---|
最早位置 | 从日志主题上最早的一条数据开始消费。 |
最新位置 | 跳过所有历史数据,直接从日志主题上最近写入的一条数据开始消费。 |
指定时间点 | 从过去某个指定时间点开始消费,该时间点以日志主题的数据保留时长为准。指定时间点重置消费点时,不支持指定为超出数据保留时长的历史时刻或未来时刻。 |
单击确定。
API | 说明 |
---|---|
调用接口 CreateConsumerGroup 创建日志服务消费组(ConsumerGroup)。 | |
调用接口 DeleteConsumerGroup 删除日志服务消费组。 | |
调用接口 DescribeConsumerGroups 获取一个日志项目下的所有消费组信息。 | |
调用接口 ModifyConsumerGroup 修改日志服务消费组的配置。 | |
调用接口 ConsumerHeartbeat 向日志服务发送消费组中的一个消费者的心跳信息。 | |
调用接口 DescribeCheckPoint 查看指定消费组在指定日志分区上的当前消费位点。 | |
调用接口 ModifyCheckPoint 为指定消费组重置指定分区的消费位点。 | |
调用接口 ResetCheckPoint 为指定消费组重置全部消费位点。 |