You need to enable JavaScript to run this app.
导航
通过 Go SDK 消费组消费日志
最近更新时间:2023.11.23 21:39:11首次发布时间:2023.11.23 21:39:11

日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据。本文档介绍如何使用 Go SDK 消费组消费日志。

前提条件

  • 已创建并获取火山引擎密钥 AccessKey。
  • 火山引擎账号的访问密钥 AccessKey 拥有所有 API 的全部权限。建议您通过 IAM 用户进行 API 相关操作和日常运维。使用 IAM 用户前,主账号应为 IAM 用户授予消费组相关的权限。授权示例请参考基于 IAM 管理权限
  • 已安装日志服务 SDK。安装步骤请参考安装 Go SDK

配置说明

日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard,您无需关注消费组的内部调度细节及消费者之间的负载均衡、故障转移等,只需要专注于业务逻辑。
关于消费组消费日志数据的基本概念等背景信息,请参考通过消费组消费数据

说明

日志服务 SDK 消费组实现了请求失败自动重试、消费进度检查点自动上报等机制。因此,您仅需要关注于如何处理每次消费得到的 LogGroupList 的业务逻辑实现即可。

Go SDK 中,log_consumer.GetDefaultConsumerConfig() 函数返回消费组的默认配置 consumerCfg,并支持配置 Endpoint、Region、AccessKeyID、AccessKeySecret 等基本信息、日志项目 ID 和日志主题 ID 列表、消费组名称和消费者名称。除此之外,您还可以通过 consumerCfg 的其他字段进行其他自定义配置。
consumerCfg 支持的字段如下所示。

参数

类型

示例值

描述

MaxFetchLogGroupCount

Integer

100

消费者单次消费日志时,最大获取 LogGroup 数量,默认为 100,最大为 1000。

HeartbeatIntervalInSecond

Integer

20

Consumer 心跳上报时间间隔,单位为秒。

DataFetchIntervalInMillisecond

Integer

200

Consumer 消费日志时间间隔,单位为毫秒。

FlushCheckpointIntervalSecond

Integer

5

Consumer 上传消费进度的时间间隔,单位为秒。

ConsumeFrom

String

begin

开始消费时的默认消费位点,与 DescribeCursor 的 From 参数一致。仅在该消费者从未上传过消费位点时有效。

OrderedConsume

Boolean

false

是否开启顺序消费。开启顺序消费后,消费者会根据 Shard 分裂的父子关系进行消费。
例如 Shard0 分裂为 Shard1 与 Shard2,而 Shard1 又分裂为 Shard3 与 Shard4。在开启顺序消费之后,会根据 (Shard0) -> (Shard1, Shard2) -> (Shard2, Shard3, Shard4) 的顺序进行消费。

LoggerConfig

LoggerConfig

/

日志相关配置,详细信息请参考下表。

LoggerConfig 配置说明:

参数

类型

示例值

描述

LogLevel

String

info

设置日志输出级别,默认值为 Info。consumer 中一共有 4 种日志输出级别,分别为 debug、info、warn 和 error。

LogFileName

String

50

日志文件输出路径。若未设置,则默认输出到 stdout。

IsJsonType

Boolean

false

是否格式化文件输出格式,默认为 false。

LogMaxSize

Integer

10

单个日志存储数量,默认为 10MiB。

LogCompress

Boolean

false

日志是否开启压缩。

示例代码

以下代码以 Go SDK 为例,演示通过 SDK 创建消费组和消费者,并消费日志的整体流程。

package tls

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/pkg/errors"
    log_consumer "github.com/volcengine/volc-sdk-golang/service/tls/consumer"
    "github.com/volcengine/volc-sdk-golang/service/tls/pb"
)

func launchConsumer() error {
    // 获取消费组的默认配置
    consumerCfg := log_consumer.GetDefaultConsumerConfig()
    // 推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455
    // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空
    consumerCfg.Endpoint = os.Getenv("VOLCENGINE_ENDPOINT") 
    consumerCfg.Region = os.Getenv("VOLCENGINE_REGION")
    consumerCfg.AccessKeyID = os.Getenv("VOLCENGINE_ACCESS_KEY_ID")
    consumerCfg.AccessKeySecret = os.Getenv("VOLCENGINE_ACCESS_KEY_SECRET")
    // 请配置您的日志项目ID和日志主题ID列表
    consumerCfg.ProjectID = "<YOUR-PROJECT-ID>"
    consumerCfg.TopicIDList = []string{"<YOUR-TOPIC-ID>"}
    // 请配置您的消费组名称(若您未创建过消费组,SDK将默认为您创建指定名称的消费组)
    consumerCfg.ConsumerGroupName = "<CONSUMER-GROUP-NAME>"
    // 请配置消费者名称(同一个消费组的不同消费者需要保证不同名)
    consumerCfg.ConsumerName = "<CONSUMER_NAME>"

    // 定义日志消费函数,您可根据业务需要,自行实现处理LogGroupList的日志消费函数
    // 下面展示了逐个打印消费到的每条日志的每个键值对的代码实现示例
    var handleLogs = func(topicID string, shardID int, l *pb.LogGroupList) {
       fmt.Printf("received new logs from topic: %s, shard: %d\n", topicID, shardID)
       for _, logGroup := range l.LogGroups {
          for _, log := range logGroup.Logs {
             for _, content := range log.Contents {
                fmt.Printf("%s: %s\n", content.Key, content.Value)
             }
          }
       }
    }
    
    // 创建消费者
    consumer, err := log_consumer.NewConsumer(context.TODO(), consumerCfg, handleLogs)
    if err != nil {
       return errors.Wrap(err, "get new consumer failed: ")
    }

    // 启动消费者消费
    if err := consumer.Start(); err != nil {
       return errors.Wrap(err, "start consumer failed: ")
    }

    // 等待消费
    <-time.After(time.Second * 60)

    // 停止消费
    consumer.Stop()

    return nil
}

func main() {
    if err := launchConsumer(); err != nil {
       fmt.Println(err.Error())
    }
}