日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据。本文档介绍如何使用 Java SDK 消费组消费日志。
日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard,您无需关注消费组的内部调度细节及消费者之间的负载均衡、故障转移等,只需要专注于业务逻辑。
关于消费组消费日志数据的基本概念等背景信息,请参考通过消费组消费数据。
说明
日志服务 SDK 消费组实现了请求失败自动重试、消费进度检查点自动上报等机制。因此,您仅需要关注于如何处理每次消费得到的 LogGroupList 的业务逻辑实现即可。
Java SDK 中,ConsumerConfig 类的构造函数返回了Java SDK 消费组的默认配置 config,config 中应配置 endpoint、region、accessKeyID、accessKeySecret等基本信息、日志项目 ID 和日志主题 ID 列表、消费组名称和消费者名称。
除此之外,您还可通过 ConsumerConfig 其他字段的 setter 方法进行额外的自定义配置。ConsumerConfig 支持的参数如下:
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
maxFetchLogGroupCount | int | 100 | 消费者单次消费日志时,获取的最大 LogGroup 数量,默认为 100,最大为 1000。 |
heartbeatIntervalInSecond | int | 20 | Consumer 心跳上报时间间隔,单位为秒。 |
dataFetchIntervalInMillisecond | int | 200 | Consumer 消费日志时间间隔,单位为毫秒。 |
flushCheckpointIntervalInSecond | int | 5 | Consumer 上传消费进度的时间间隔,单位为秒。 |
consumeFrom | String | begin | 开始消费时的默认消费位点,与 DescribeCursor 的 From 参数一致。仅在该消费者从未上传过消费位点时有效。 |
orderedConsume | boolean | false | 是否开启顺序消费。开启顺序消费后,消费者会根据 Shard 分裂的父子关系进行消费。 |
以下代码以 Java SDK 为例,演示通过 SDK 创建消费组和消费者,并消费日志的整体流程。
package com.volcengine.example.tls.demo; import java.util.ArrayList; import java.util.List; import com.volcengine.model.tls.consumer.ConsumerConfig; import com.volcengine.model.tls.exception.LogException; import com.volcengine.model.tls.pb.PutLogRequest; import com.volcengine.service.tls.consumer.Consumer; import com.volcengine.service.tls.consumer.ConsumerImpl; import com.volcengine.service.tls.consumer.LogProcessor; // 您需要定义一个实现LogProcessor接口的类 public class ConsumerDemo implements LogProcessor { public static void main(String[] args) throws LogException, InterruptedException { // 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455 // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空 ConsumerConfig config = new ConsumerConfig(System.getenv("VOLCENGINE_ENDPOINT"), System.getenv("VOLCENGINE_REGION"), System.getenv("VOLCENGINE_ACCESS_KEY_ID"), System.getenv("VOLCENGINE_ACCESS_KEY_SECRET"), System.getenv("VOLCENGINE_TOKEN")); // 请配置您的日志项目ID config.setProjectID("your-project-id"); // 请配置您待消费的日志主题ID列表 config.setTopicIDList(new ArrayList<String>(){{ add("your-topic-id"); }}); // 请配置您的消费组名称 config.setConsumerGroupName("java-consumer-group"); // 请配置消费者名称 config.setConsumerName("java-consumer"); // 实例化ConsumerImpl,调用consumer.start()开始持续消费 Consumer consumer = new ConsumerImpl(config, new ConsumerDemo()); consumer.start(); // 可通过调用consumer.stop()来结束消费组消费 Thread.sleep(10000); consumer.stop(); } /** * 您需要根据业务需要,自行实现这里的process方法,用于处理每次消费得到的LogGroupList * 下面给出了逐个打印消费到的日志的代码示例 */ @Override public void process(String topicID, int shardID, PutLogRequest.LogGroupList logGroupList) { System.out.println(topicID + " --- " + shardID); System.out.println(logGroupList.getLogGroupsCount()); int count = 0; List<PutLogRequest.LogGroup> logGroups = logGroupList.getLogGroupsList(); for (PutLogRequest.LogGroup logGroup: logGroups) { List<PutLogRequest.Log> logs = logGroup.getLogsList(); for (PutLogRequest.Log log: logs) { count++; System.out.println("*** Count = " + count + " ***"); List<PutLogRequest.LogContent> logContents = log.getContentsList(); for (PutLogRequest.LogContent logContent: logContents) { System.out.println(logContent.getKey() + ": " + logContent.getValue()); } System.out.println(); } } } }