You need to enable JavaScript to run this app.
导航
通过 Go SDK 消费日志数据
最近更新时间:2023.11.23 22:10:08首次发布时间:2023.11.23 22:10:08

日志服务支持通过 SDK 消费采集到服务端的日志数据。本文档通过示例代码演示如何通过 Go SDK 消费日志。

前提条件

  • 已安装日志服务 Go SDK。更多信息,请参见安装 Go SDK

  • 已执行以下命令安装 proto 依赖包。

    go get -u github.com/gogo/protobuf/proto
    
  • 已添加 VOLCENGINE_ACCESS_KEY_ID 等环境变量。环境变量的配置方式请参考配置身份认证信息

    注意

    推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。

消费日志

场景说明

本文档通过示例代码演示如何通过 SDK 消费日志数据。Go SDK 支持通过以下方式写入日志:

写入方式

说明

Consumer

推荐。
在实际生产环境中,为了提高数据消费效率,建议通过 Go Consumer 方式消费日志数据。Consumer 支持负载均衡地消费日志主题下所有分区的数据,具有异步消费、高性能、失败重试、优雅关闭等特性。示例代码请参考Consumer 消费日志数据,通过消费组消费日志的详细说明请参考通过消费组消费数据通过 Go SDK 消费组消费日志

ConsumeLogs

不推荐。
日志服务支持通过 ConsumeLogs 接口同步请求的方式上传日志。消费日志的进度受限于单个 Shard 的读写能力,还需要自行维护消费进度,在 Shard 自动分裂的场景下消费逻辑与流程繁琐。
如果您在调用 PutLogs 时选择了 HashKey 路由 Shard 模式,日志数据将有序写入到指定分区中。在这种场景下,您可调用 ConsumeLogs 接口针对性地消费某个分区的日志数据。示例代码请参考ConsumeLogs 同步接口消费日志

Consumer 消费日志数据

通过 Go Consumer 消费日志数据的示例代码如下。

package tls

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/pkg/errors"
    log_consumer "github.com/volcengine/volc-sdk-golang/service/tls/consumer"
    "github.com/volcengine/volc-sdk-golang/service/tls/pb"
)

func launchConsumer() error {
    // 获取消费组的默认配置
    consumerCfg := log_consumer.GetDefaultConsumerConfig()
    // 请配置您的Endpoint、Region、AccessKeyID、AccessKeySecret等基本信息
    consumerCfg.Endpoint = os.Getenv("VOLCENGINE_ENDPOINT")
    consumerCfg.Region = os.Getenv("VOLCENGINE_REGION")
    consumerCfg.AccessKeyID = os.Getenv("VOLCENGINE_ACCESS_KEY_ID")
    consumerCfg.AccessKeySecret = os.Getenv("VOLCENGINE_ACCESS_KEY_SECRET")
    // 请配置您的日志项目ID和日志主题ID列表
    consumerCfg.ProjectID = "<YOUR-PROJECT-ID>"
    consumerCfg.TopicIDList = []string{"<YOUR-TOPIC-ID>"}
    // 请配置您的消费组名称(若您未创建过消费组,SDK将默认为您创建指定名称的消费组)
    consumerCfg.ConsumerGroupName = "<CONSUMER-GROUP-NAME>"
    // 请配置消费者名称(同一个消费组的不同消费者需要保证不同名)
    consumerCfg.ConsumerName = "<CONSUMER_NAME>"

    // 定义日志消费函数,您可根据业务需要,自行实现处理LogGroupList的日志消费函数
    // 下面展示了逐个打印消费到的每条日志的每个键值对的代码实现示例
    var handleLogs = func(topicID string, shardID int, l *pb.LogGroupList) {
       fmt.Printf("received new logs from topic: %s, shard: %d\n", topicID, shardID)
       for _, logGroup := range l.LogGroups {
          for _, log := range logGroup.Logs {
             for _, content := range log.Contents {
                fmt.Printf("%s: %s\n", content.Key, content.Value)
             }
          }
       }
    }
    
    // 创建消费者
    consumer, err := log_consumer.NewConsumer(context.TODO(), consumerCfg, handleLogs)
    if err != nil {
       return errors.Wrap(err, "get new consumer failed: ")
    }

    // 启动消费者消费
    if err := consumer.Start(); err != nil {
       return errors.Wrap(err, "start consumer failed: ")
    }

    // 等待消费
    <-time.After(time.Second * 60)

    // 停止消费
    consumer.Stop()

    return nil
}

func main() {
    if err := launchConsumer(); err != nil {
       fmt.Println(err.Error())
    }
}

ConsumeLogs 同步接口消费日志

通过调用 ConsumeLogs 同步接口消费日志数据的示例代码如下。

package tls

import (
    "fmt"
    "github.com/volcengine/volc-sdk-golang/service/tls"
    "github.com/volcengine/volc-sdk-golang/service/tls/pb"
)

func main() {
    // 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455
    // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空
    client := tls.NewClient(os.Getenv("VOLCENGINE_ENDPOINT"), os.Getenv("VOLCENGINE_ACCESS_KEY_ID"),
        os.Getenv("VOLCENGINE_ACCESS_KEY_SECRET"), os.Getenv("VOLCENGINE_TOKEN"), os.Getenv("VOLCENGINE_REGION"))

    // 请填写您希望消费日志的TopicID和shardID
    topicID := "TopicId"
    shardID := 0

    // 获取日志消费的起始游标
    // DescribeCursor API的请求参数规范和限制请参阅https://www.volcengine.com/docs/6470/112193
    describeCursorResp, err := client.DescribeCursor(&tls.DescribeCursorRequest{
       TopicID: topicID,
       ShardID: shardID,
       From:    "begin",
    })
    if err != nil {
       // 处理错误
       fmt.Println(err.Error())
    }
    beginCursor := describeCursorResp.Cursor

    // 消费日志数据
    // 请根据您的需要,填写TopicId、ShardId、Cursor、LogGroupCount、Compression等参数,推荐您使用lz4压缩
    // 您可再次调用DescribeCursor接口获取日志消费的结束游标,作为ConsumeLogs接口的EndCursor参数值
    // ConsumeLogs API的请求参数规范和限制请参阅https://www.volcengine.com/docs/6470/112194
    logGroupCount := 1000
    compression := "lz4"
    resp, err := client.ConsumeLogs(&tls.ConsumeLogsRequest{
       TopicID:       topicID,
       ShardID:       shardID,
       Cursor:        beginCursor,
       LogGroupCount: &logGroupCount,
       Compression:   &compression,
    })
    if err != nil {
       // 处理错误
       fmt.Println(err.Error())
    }

    handleLogs(topicID, shardID, resp.Logs)
}

// 定义日志消费函数,您可根据业务需要,自行实现处理LogGroupList的日志消费函数
// 下面展示了逐个打印消费到的每条日志的每个键值对的代码实现示例
func handleLogs(topicID string, shardID int, l *pb.LogGroupList) {
    fmt.Printf("received new logs from topic: %s, shard: %d\n", topicID, shardID)
    for _, logGroup := range l.LogGroups {
       for _, log := range logGroup.Logs {
          for _, content := range log.Contents {
             fmt.Printf("%s: %s\n", content.Key, content.Value)
          }
       }
    }
}

相关文档

  • 通过 SDK 发送调用 API 的请求以后,您会收到服务端的响应,如果响应中包含 200 以外的状态码,表示接口调用失败。您可以参考各个 API 的文档查看对应的错误码信息。
  • 关于 Go Consumer 的详细信息,请参考 Go Consumer
  • 关于 Go Consumer 消费日志的完整示例代码,请参考 TLS Go SDK Demo on GitHub