日志服务提供 Kafka 协议消费功能,即可以将一个日志主题,当作一个 Kafka Topic 来消费。本文档介绍通过 Kafka 协议消费日志数据的相关步骤。
日志服务支持为指定的日志主题开启 Kafka 协议消费功能,开启后,可以将日志主题作为 Kafka 的 Topic 进行消费,每条日志对应一条 Kafka 消息。在实际的业务场景中,通过开源 Kafka SDK 成功对接日志服务后,可以使用 Kafka Consumer 将采集到指定日志主题的日志数据消费到下游的大数据组件或者数据仓库,适用于流式计算或大数据存储场景。
通过 Kafka 协议消费日志时,支持消费者或消费组形式消费;不支持跨日志项目进行消费。
消费日志时会产生私网或公网的读流量。价格信息请参考计费指引。
说明
如果源日志主题和消费端属于不同地域,则必须使用公网传输,此时会产生公网读流量。
ConsumeLogs
的权限。详细信息请参考可授权的操作。使用各种工具通过 Kafka 协议消费日志数据之前,需要为指定日志主题开启 Kafka 消费功能。
说明
目前日志服务支持通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费,您可以参考下文配置 Kafka 的基本参数,并参考示例代码消费日志数据。
说明
Kafka 协议消费功能启动预计需要 30 秒左右,建议在开启功能 30 秒后开始消费日志。
使用 Kafka 协议消费日志时,您需要配置以下参数。
参数 | 说明 |
---|---|
连接类型 | 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥,详细信息请参考示例代码。 |
username | Kafka SASL 用户名。应配置为日志服务的日志项目 ID。 |
password | Kafka SASL 用户密码。应配置为火山引擎账户密钥。
说明 建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action |
hosts | 初始连接的集群地址,格式为
说明 hosts 中的服务地址部分无需指定 |
topic | Kafka 协议消费主题 ID,格式为 |
使用 Kafka 协议上传日志失败时,会按照 Kafka 的错误码返回对应的错误信息,请参考 Kafka error list获取更多信息。
除此之外,日志服务还在 Java 语言 Kafka 错误码 SASLAuthenticationException
中封装了鉴权、配置相关参数的错误信息,详细说明如下:
错误信息 | 说明 |
---|---|
invalid SASL/PLAIN request: expected 3 tokens | 未配置 user 或者password。请参考配置方式正确填写配置。 |
invalid SASL/PLAIN request: empty projectId | 未配置 user 字段。请参考配置方式正确填写配置。 |
invalid SASL/PLAIN request: Password format wrong | password 字段配置错误。请参考配置方式正确填写配置。 |
invalid SASL/PLAIN request: empty AccessKey | 未配置 AK。请参考配置方式正确填写配置。 |
invalid SASL/PLAIN request: empty SecretKey | 未配置 SK。请参考配置方式正确填写配置。 |
invalid SASL/PLAIN request: Invalid projectId | 指定的 Project ID 不存在。请检查 Project ID 是否输入正确。 |
Not allow consume, please open kafka consumer | 指定 Topic 没有开启消费功能。请参考开启 Kafka 消费功能操作。 |
Access Denied. You are not authorized to perform this operation. | 当前用户不具备操作权限。建议检查用户权限。 |
通过简单的参数配置,即可使用各类 Kafka Consumer SDK 消费日志服务中已采集到的数据。通过Kafka Java SDK 消费日志的相关依赖及示例代码如下:
添加依赖。
在 pom 文件中添加 kafka-clients 的相关依赖。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.2</version> </dependency>
消费日志。
参考以下示例代码通过 Kafka Java SDK 消费日志。
package org.kafka; import java.util.*; import java.util.concurrent.CountDownLatch; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; public class KafkaConsumeTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "MY_GROUP_ID"; private KafkaAdminClient adminClient; private String consumeName; public KafkaConsumeTest(String topicName,String consumeName) { // consumeName this.consumeName = consumeName; // projectid String userName = "${projectId}"; // 火山引擎账号的密钥,或具备对应权限的子账号密钥。不支持STS临时安全令牌。 String passWord = "${access-key-id}#${access-key-secret}"; Properties props = new Properties(); props.put("bootstrap.servers", "${hosts}"); //消费的地址,具体见文档 props.put("group.id", GROUPID);//消费组名称,长度为 1~256 个字符。 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule " + "required username=\"" + userName + "\" password=\"" + passWord + "\";"); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); this.adminClient = (KafkaAdminClient) KafkaAdminClient.create(props); } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); try { while(true) { msgList = consumer.poll(10); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { System.out.println(this.consumeName+"==="+messageNo+" offset==="+record.offset()+"=======receive: key = " + record.key() + ", value = " + record.value()); messageNo++; } } else{ Thread.sleep(10); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); //${out-TopicID}为Kafka协议消费主题ID,格式为out+日志主题ID,例如"out-0fdaa6b6-3c9f-424c-8664-fc0d222c****"。 KafkaConsumeTest test1 = new KafkaConsumeTest("${out-TopicID}","Consume1"); Thread thread1 = new Thread(test1); thread1.start(); countDownLatch.await(); } }
通过简单的参数配置,即可使用各类 Kafka Consumer SDK 消费日志服务中已采集到的数据。通过Kafka Go SDK 消费日志的相关依赖及示例代码如下:
执行以下命令安装 Sarama。
go get github.com/Shopify/sarama@v1.38.1
导入 Sarama 等必要的依赖包,并消费日志。
参考以下示例代码通过 Kafka Go SDK 消费日志。
package kafka_consume_example import ( "context" "fmt" "github.com/Shopify/sarama" ) type exampleConsumerGroupHandler struct{} func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { fmt.Printf("Message topic:%q partition:%d offset:%d message:%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value) sess.MarkMessage(msg, "") sess.Commit() } return nil } func ExampleConsumerGroup() { config := sarama.NewConfig() config.Version = sarama.V2_0_0_0 // specify appropriate version config.ApiVersionsRequest = true config.Consumer.Return.Errors = true config.Net.SASL.Mechanism = "PLAIN" config.Net.SASL.Version = int16(0) config.Net.SASL.Enable = true config.Net.TLS.Enable = true config.Consumer.Offsets.Initial = sarama.OffsetNewest // projectId config.Net.SASL.User = "${projectId}" // 火山引擎账号的密钥,或具备对应权限的子账号密钥。不支持STS临时安全令牌。 config.Net.SASL.Password = "${access-key-id}#${access-key-secret}" // host为消费的地址,具体见文档;my-group 为消费组名称,请根据实际值替换,长度为 1~256 个字符。 group, err := sarama.NewConsumerGroup([]string{"${hosts}"}, "my-group", config) if err != nil { panic(err) } defer func() { _ = group.Close() }() // Track errors go func() { for err := range group.Errors() { fmt.Println("ERROR", err) } }() // Iterate over consumer sessions. ctx := context.Background() for { // ${out-TopicID}为Kafka协议消费主题ID,格式为out+日志主题ID,例如"out-0fdaa6b6-3c9f-424c-8664-fc0d222c****"。 topics := []string{"${out-TopicID}"} handler := exampleConsumerGroupHandler{} // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims err := group.Consume(ctx, topics, handler) if err != nil { panic(err) } } }
通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费后,您可以在日志服务控制台上查看消费组信息。
说明
重置消费位点之前请先关闭进程。
通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费后,您可以在日志服务控制台上重置消费位点。您还可以通过重置消费位点实现历史数据的消费。
在日志主题列表页签中的右上角单击重置消费位点,或者在指定 Shard 对应的操作列单击重置消费位点。
选择重置位置。
重置位置 | 说明 |
---|---|
最早位置 | 从日志主题上最早的一条数据开始消费。 |
最新位置 | 跳过所有历史数据,直接从日志主题上最近写入的一条数据开始消费。 |
指定时间点 | 从过去某个指定时间点开始消费,可选的时间范围取决于日志主题的日志存储时长。 |
单击确定。
通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费后,您可以在日志服务控制台上查看消费组延迟和消费速度的变化趋势。