日志服务支持通过 SDK 消费采集到服务端的日志数据。本文档通过示例代码演示如何通过 Java SDK 消费日志。
注意
推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。
本文档通过示例代码演示如何通过 SDK 消费日志数据。Java SDK 支持通过以下方式写入日志:
写入方式 | 说明 |
---|---|
Consumer | 推荐。 |
ConsumeLogs | 不推荐。 |
通过 Java Consumer 消费日志数据的示例代码如下。
package com.volcengine.example.tls.demo;
import java.util.ArrayList;
import java.util.List;
import com.volcengine.model.tls.consumer.ConsumerConfig;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.service.tls.consumer.Consumer;
import com.volcengine.service.tls.consumer.ConsumerImpl;
import com.volcengine.service.tls.consumer.LogProcessor;
// 您需要定义一个实现LogProcessor接口的类
public class ConsumerDemo implements LogProcessor {
public static void main(String[] args) throws LogException, InterruptedException {
// 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455
// 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空
ConsumerConfig config = new ConsumerConfig(System.getenv("VOLCENGINE_ENDPOINT"), System.getenv("VOLCENGINE_REGION"),
System.getenv("VOLCENGINE_ACCESS_KEY_ID"), System.getenv("VOLCENGINE_ACCESS_KEY_SECRET"), System.getenv("VOLCENGINE_TOKEN"));
// 请配置您的日志项目ID
config.setProjectID("your-project-id");
// 请配置您待消费的日志主题ID列表
config.setTopicIDList(new ArrayList<String>(){{
add("your-topic-id");
}});
// 请配置您的消费组名称
config.setConsumerGroupName("java-consumer-group");
// 请配置消费者名称
config.setConsumerName("java-consumer");
// 实例化ConsumerImpl,调用consumer.start()开始持续消费
Consumer consumer = new ConsumerImpl(config, new ConsumerDemo());
consumer.start();
// 可通过调用consumer.stop()来结束消费组消费
Thread.sleep(10000);
consumer.stop();
}
/**
* 您需要根据业务需要,自行实现这里的process方法,用于处理每次消费得到的LogGroupList
* 下面给出了逐个打印消费到的日志的代码示例
*/
@Override
public void process(String topicID, int shardID, PutLogRequest.LogGroupList logGroupList) {
System.out.println(topicID + " --- " + shardID);
System.out.println(logGroupList.getLogGroupsCount());
int count = 0;
List<PutLogRequest.LogGroup> logGroups = logGroupList.getLogGroupsList();
for (PutLogRequest.LogGroup logGroup: logGroups) {
List<PutLogRequest.Log> logs = logGroup.getLogsList();
for (PutLogRequest.Log log: logs) {
count++;
System.out.println("*** Count = " + count + " ***");
List<PutLogRequest.LogContent> logContents = log.getContentsList();
for (PutLogRequest.LogContent logContent: logContents) {
System.out.println(logContent.getKey() + ": " + logContent.getValue());
}
System.out.println();
}
}
}
}
通过调用 ConsumeLogs 同步接口消费日志数据的示例代码如下。
package com.volcengine.example.tls.demo;
import com.volcengine.model.tls.*;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.request.*;
import com.volcengine.model.tls.response.*;
import com.volcengine.service.tls.TLSLogClient;
import java.util.ArrayList;
import java.util.List;
public class Demo {
public static void main(String[] args) throws LogException {
// 初始化SDK配置,请您根据账号和服务信息配置endpoint、region、acccessKeyID、accessKeySecret和token(token可为null)
ClientConfig clientConfig = new ClientConfig(System.getenv("VOLCENGINE_ENDPOINT"), System.getenv("VOLCENGINE_REGION"),
System.getenv("VOLCENGINE_ACCESS_KEY_ID"), System.getenv("VOLCENGINE_ACCESS_KEY_SECRET"), System.getenv("VOLCENGINE_TOKEN"));
TLSLogClient client = ClientBuilder.newClient(clientConfig);
// 请填写您希望消费日志的TopicID和shardID
String topicID = "your-topic-id";
int shardID = 0;
// 获取日志消费的起始游标
// DescribeCursor API的请求参数规范和限制请参阅https://www.volcengine.com/docs/6470/112193
DescribeCursorRequest describeCursorRequest = new DescribeCursorRequest(topicID, shardID, "begin");
DescribeCursorResponse describeCursorResponse = client.describeCursor(describeCursorRequest);
String beginCursor = describeCursorResponse.getCursor();
// 消费日志数据
// 请根据您的需要,填写TopicId、ShardId、Cursor、LogGroupCount、Compression等参数,推荐您使用lz4压缩
// 您可再次调用DescribeCursor接口获取日志消费的结束游标,作为ConsumeLogs接口的EndCursor参数值
// ConsumeLogs API的请求参数规范和限制请参阅https://www.volcengine.com/docs/6470/112194
ConsumeLogsRequest consumeLogsRequest = new ConsumeLogsRequest();
consumeLogsRequest.setTopicId(topicID);
consumeLogsRequest.setShardId(shardID);
consumeLogsRequest.setCursor(beginCursor);
consumeLogsRequest.setLogGroupCount(1000);
consumeLogsRequest.setCompression("lz4");
ConsumeLogsResponse consumeLogsResponse = client.consumeLogs(consumeLogsRequest);
PutLogRequest.LogGroupList logGroupList = consumeLogsResponse.getLogGroupList();
}
}