You need to enable JavaScript to run this app.
文档中心
日志服务

日志服务

复制全文
Java SDK
通过 Java SDK 消费组消费日志
复制全文
通过 Java SDK 消费组消费日志

日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据。本文档介绍如何使用 Java SDK 消费组消费日志。

前提条件

  • 已创建并获取火山引擎密钥 AccessKey。
  • 火山引擎账号的访问密钥 AccessKey 拥有所有 API 的全部权限。建议您通过 IAM 用户进行 API 相关操作和日常运维。使用 IAM 用户前,主账号应为 IAM 用户授予消费组相关的权限。授权示例请参考基于 IAM 管理权限
  • 已安装日志服务 SDK。安装步骤请参考安装 Java SDK

配置说明

日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard,您无需关注消费组的内部调度细节及消费者之间的负载均衡、故障转移等,只需要专注于业务逻辑。
关于消费组消费日志数据的基本概念等背景信息,请参考通过消费组消费数据

说明

日志服务 SDK 消费组实现了请求失败自动重试、消费进度检查点自动上报等机制。因此,您仅需要关注于如何处理每次消费得到的 LogGroupList 的业务逻辑实现即可。

Java SDK 中,ConsumerConfig 类的构造函数返回了Java SDK 消费组的默认配置 config,config 中应配置 endpoint、region、accessKeyID、accessKeySecret等基本信息、日志项目 ID 和日志主题 ID 列表、消费组名称和消费者名称。
除此之外,您还可通过 ConsumerConfig 其他字段的 setter 方法进行额外的自定义配置。ConsumerConfig 支持的参数如下:

参数

类型

示例值

描述

maxFetchLogGroupCount

int

100

消费者单次消费日志时,获取的最大 LogGroup 数量,默认为 100,最大为 1000。

heartbeatIntervalInSecond

int

20

Consumer 心跳上报时间间隔,单位为秒。

dataFetchIntervalInMillisecond

int

200

Consumer 消费日志时间间隔,单位为毫秒。

flushCheckpointIntervalInSecond

int

5

Consumer 上传消费进度的时间间隔,单位为秒。

consumeFrom

String

begin

开始消费时的默认消费位点,与 DescribeCursor 的 From 参数一致。仅在该消费者从未上传过消费位点时有效。

orderedConsume

boolean

false

是否开启顺序消费。开启顺序消费后,消费者会根据 Shard 分裂的父子关系进行消费。
例如 Shard0 分裂为 Shard1 与 Shard2,而 Shard1 又分裂为 Shard3 与 Shard4。在开启顺序消费之后,会根据 (Shard0) -> (Shard1, Shard2) -> (Shard2, Shard3, Shard4) 的顺序进行消费。

示例代码

以下代码以 Java SDK 为例,演示通过 SDK 创建消费组和消费者,并消费日志的整体流程。

package com.volcengine.example.tls.demo;

import java.util.ArrayList;
import java.util.List;

import com.volcengine.model.tls.consumer.ConsumerConfig;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.service.tls.consumer.Consumer;
import com.volcengine.service.tls.consumer.ConsumerImpl;
import com.volcengine.service.tls.consumer.LogProcessor;


// 您需要定义一个实现LogProcessor接口的类
public class ConsumerDemo implements LogProcessor {
    public static void main(String[] args) throws LogException, InterruptedException {
        // 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455
        // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空 
        ConsumerConfig config = new ConsumerConfig(System.getenv("VOLCENGINE_ENDPOINT"), System.getenv("VOLCENGINE_REGION"),
                System.getenv("VOLCENGINE_ACCESS_KEY_ID"), System.getenv("VOLCENGINE_ACCESS_KEY_SECRET"), System.getenv("VOLCENGINE_TOKEN"));
        // 请配置您的日志项目ID
        config.setProjectID("your-project-id");
        // 请配置您待消费的日志主题ID列表
        config.setTopicIDList(new ArrayList<String>(){{
            add("your-topic-id");
        }});
        // 请配置您的消费组名称
        config.setConsumerGroupName("java-consumer-group");
        // 请配置消费者名称
        config.setConsumerName("java-consumer");

        // 实例化ConsumerImpl,调用consumer.start()开始持续消费
        Consumer consumer = new ConsumerImpl(config, new ConsumerDemo());
        consumer.start();

        // 可通过调用consumer.stop()来结束消费组消费
        Thread.sleep(10000);
        consumer.stop();
    }

    /**
     * 您需要根据业务需要,自行实现这里的process方法,用于处理每次消费得到的LogGroupList
     * 下面给出了逐个打印消费到的日志的代码示例
     */
    @Override
    public void process(String topicID, int shardID, PutLogRequest.LogGroupList logGroupList) {
        System.out.println(topicID + " --- " + shardID);
        System.out.println(logGroupList.getLogGroupsCount());

        int count = 0;

        List<PutLogRequest.LogGroup> logGroups = logGroupList.getLogGroupsList();
        for (PutLogRequest.LogGroup logGroup: logGroups) {
            List<PutLogRequest.Log> logs = logGroup.getLogsList();
            for (PutLogRequest.Log log: logs) {
                count++;
                System.out.println("*** Count = " + count + " ***");
                List<PutLogRequest.LogContent> logContents = log.getContentsList();
                for (PutLogRequest.LogContent logContent: logContents) {
                    System.out.println(logContent.getKey() + ": " + logContent.getValue());
                }
                System.out.println();
            }
        }
    }
}

最近更新时间:2024.02.27 14:30:45
这个页面对您有帮助吗?
有用
有用
无用
无用