You need to enable JavaScript to run this app.
日志服务

日志服务

复制全文
消费与投递
通过 ConsumeLogs 消费日志
复制全文
通过 ConsumeLogs 消费日志

日志服务提供消费日志的 API 接口 ConsumeLogs,支持实时消费采集到服务端的日志数据。本文档介绍如何通过日志服务 SDK 调用 ConsumeLogs 接口,实现日志全量数据的顺序读写。

背景信息

日志服务可作为日志数据的传输中转,提供类似 Kafka 的日志数据中转功能。通过日志服务的消费日志接口和多语言 SDK,您可以将 Go 等语言的应用作为消费者实时消费日志服务数据。关于消费日志的服务端接口,请查看 ConsumeLogs,关于日志服务提供的多语言 SDK,请参考 SDK 文档

说明

日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard。详细说明请参考 通过消费组消费数据

示例代码

参考以下示例代码,创建一个 ConsumeLogs.go 文件,调用接口 ConsumeLogs 读取日志数据,并实时消费日志数据。

import (
        "fmt"
        "time"
        . "github.com/volcengine/volc-sdk-golang/service/tls"
        "github.com/volcengine/volc-sdk-golang/service/tls/pb"
)
func consumeLogs(topicID string, shardID int) {
        client := NewClient(os.Getenv("LOG_TEST_ENDPOINT"), os.Getenv("LOG_TEST_ACCESS_KEY_ID"),
        //LOG_TEST_SECURITY_TOKEN为通过IAM的STS机制获取的临时安全令牌,使用临时Token时也应传入临时的AK、SK。详细说明请参考https://www.volcengine.com/docs/6470/160162。不使用临时Token时LOG_TEST_SECURITY_TOKEN传空即可。
                os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), os.Getenv("LOG_TEST_SECURITY_TOKEN"),
                os.Getenv("LOG_TEST_REGION"))
        // 获取最初的日志游标
        beginCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{
                TopicID: topicID,
                ShardID: shardID,
                From:    "begin",
        })
        if err != nil {
                panic(err.Error())
        }
        // 获取最后的日志游标
        endCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{
                TopicID: topicID,
                ShardID: shardID,
                From:    "end",
        })
        if err != nil {
                panic(err.Error())
        }
        // 获取某个时间点的日志游标
        timestampCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{
                TopicID: topicID,
                ShardID: shardID,
                From:    "1661418000",
        })
        if err != nil {
                panic(err.Error())
        }
        var logGroupCount = 10
        // 消费十组日志
        consumedLogs, err := client.ConsumeLogs(&ConsumeLogsRequest{
                TopicID:       topicID,
                ShardID:       shardID,
                Cursor:        timestampCursorResp.Cursor,
                LogGroupCount: &logGroupCount,
        })
        if err != nil {
                panic(err.Error())
        }
        fmt.Println(consumedLogs)
        // 消费十组日志,且指定压缩格式为lz4(目前支持lz4)
        var compressionType = "lz4"
        consumedLogs, err = client.ConsumeLogs(&ConsumeLogsRequest{
                TopicID:       topicID,
                ShardID:       shardID,
                Cursor:        timestampCursorResp.Cursor,
                LogGroupCount: &logGroupCount,
                Compression:   &compressionType,
        })
        if err != nil {
                panic(err.Error())
        }
        // 指定起始、结尾消费日志
        consumedLogs, err = client.ConsumeLogs(&ConsumeLogsRequest{
                TopicID:       topicID,
                ShardID:       shardID,
                Cursor:        beginCursorResp.Cursor,
                EndCursor:     &endCursorResp.Cursor,
                LogGroupCount: &logGroupCount,
                Compression:   &compressionType,
        })
        if err != nil {
                panic(err.Error())
        }
}
最近更新时间:2023.11.13 16:31:31
这个页面对您有帮助吗?
有用
有用
无用
无用