You need to enable JavaScript to run this app.
导航
通过 Kafka 协议消费日志
最近更新时间:2024.11.05 10:35:54首次发布时间:2022.09.15 14:39:24

日志服务提供 Kafka 协议消费功能,即可以将一个日志主题,当作一个 Kafka Topic 来消费。本文档介绍通过 Kafka 协议消费日志数据的相关步骤。

背景信息

日志服务支持为指定的日志主题开启 Kafka 协议消费功能,开启后,可以将日志主题作为 Kafka 的 Topic 进行消费,每条日志对应一条 Kafka 消息。在实际的业务场景中,通过开源 Kafka SDK 成功对接日志服务后,可以使用 Kafka Consumer 将采集到指定日志主题的日志数据消费到下游的大数据组件或者数据仓库,适用于流式计算或大数据存储场景。
通过 Kafka 协议消费日志时,支持消费者或消费组形式消费;不支持跨日志项目进行消费。

限制说明

  • Kafka 协议消费功能支持的 Kafka Client 版本为 0.11.x~2.0.x。
  • Kafka 协议消费功能为开启状态时,您可以消费 Kafka Consumer 运行期间采集到服务端的日志数据。
    • Consumer 首次启动前采集的日志数据不支持消费。
    • Consumer 短暂重启期间的日志数据可被消费,但消费中断 2 小时以后采集的日志数据不支持消费。
    • 供 Kafka 消费的日志数据在服务端的数据保留时间为 2 小时,2 小时后或关闭 Kafka 协议消费功能时会被删除。但有效期内的日志数据可以被持续消费。
  • 支持通过标准的开源 Kafka Java SDK 进行日志数据消费,消费日志的示例代码请参考示例代码。也可以使用 Spark Streaming 或 Flink 的 Kakfa 插件对接日志服务,详细说明请参考通过 Spark Streaming 消费日志通过 Flink 消费日志
  • 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥,详细信息请参考示例代码
  • 如果日志主题中有多个 Shard,日志服务不保证消费的有序性,建议使用负载均衡模式上传日志。

费用说明

消费日志时会产生私网或公网的读流量。价格信息请参考计费指引

  • 内网读流量:通过 Kafka 协议消费日志数据到火山引擎其他私网服务时,如果源日志主题和消费端属于同一地域,可以使用私网传输,此时会产生内网读流量费用。例如源数据在日志服务北京地域的某日志主题中,通过 Kafka 消费日志数据到 ECS 自建程序中,则需要支付对应私网流量的费用。
  • 公网读流量:通过 Kafka 协议消费日志数据到公网的外部程序时,必须使用公网传输数据,此时会产生公网读流量。例如源数据在日志服务某日志主题中,通过 Kafka 消费日志数据到自建 IDC 的自研程序,则会产生公网读流量。

说明

如果源日志主题和消费端属于不同地域,则必须使用公网传输,此时会产生公网读流量。

前提条件

  • 已开通日志服务,创建日志项目与日志主题,并成功采集到日志数据。详细说明请参考快速入门
  • 确保当前操作账号拥有开通 Kafka 协议消费的权限,即具备 Action ConsumeLogs 的权限。详细信息请参考可授权的操作

操作步骤

1 开启Kafka消费功能

使用各种工具通过 Kafka 协议消费日志数据之前,需要为指定日志主题开启 Kafka 消费功能。

  1. 登录日志服务控制台
  2. 在顶部导航栏中,选择日志服务所在的地域。
  3. 在左侧导航栏中,选择资源管理 > 日志项目
  4. 日志项目页面,找到指定的日志项目,并单击项目名称。
  5. 日志主题区域,找到指定的日志主题,并单击日志主题名称。
  6. Kafka协议消费区域中,打开对应的功能开关。
  7. 在弹出对话框中确认待开启 Kafka 协议消费功能的日志项目和日志主题,并单击确定
    成功开启Kafka协议消费功能之后,此日志主题的详情页面会显示 Kafka协议消费主题ID

说明

  • 请记录并妥善保管Kafka协议消费主题ID。通过 Kafka 协议消费此日志主题中的日志数据时,Topic 应指定为此 ID。
  • 关闭 Kafka 协议消费功能 2 分钟后,才能再次开启该功能。

2 通过 Kafka 协议消费日志

目前日志服务支持通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费,您可以参考下文配置 Kafka 的基本参数,并参考示例代码消费日志数据。

说明

Kafka 协议消费功能启动预计需要 30 秒左右,建议在开启功能 30 秒后开始消费日志。

参数说明

使用 Kafka 协议消费日志时,您需要配置以下参数。

参数

说明

连接类型

为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥,详细信息请参考示例代码

username

Kafka SASL 用户名。应配置为日志服务的日志项目 ID。

password

Kafka SASL 用户密码。应配置为火山引擎账户密钥。
格式为 ${access-key-id}#${access-key-secret},其中:

  • ${access-key-id} 应替换为您的 AccessKey ID。
  • ${access-key-secret} 应替换为您的 AccessKey Secret。

说明

建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action ConsumeLogs 的权限。详细信息请参考可授权的操作

hosts

初始连接的集群地址,格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093,其中:

  • 服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址
  • 端口号固定为 9093。

说明

hosts 中的服务地址部分无需指定 https://

topic

Kafka 协议消费主题 ID,格式为 out-日志主题ID,例如 out-0fdaa6b6-3c9f-424c-8664-fc0d222c****
您也可以在日志服务控制台的 Topic 详情页中查看并复制 Kafka 协议消费主题 IDImage

错误信息

使用 Kafka 协议上传日志失败时,会按照 Kafka 的错误码返回对应的错误信息,请参考 Kafka error list获取更多信息。
除此之外,日志服务还在 Java 语言 Kafka 错误码 SASLAuthenticationException 中封装了鉴权、配置相关参数的错误信息,详细说明如下:

错误信息

说明

invalid SASL/PLAIN request: expected 3 tokens

未配置 user 或者password。请参考配置方式正确填写配置。

invalid SASL/PLAIN request: empty projectId

未配置 user 字段。请参考配置方式正确填写配置。

invalid SASL/PLAIN request: Password format wrong

password 字段配置错误。请参考配置方式正确填写配置。

invalid SASL/PLAIN request: empty AccessKey

未配置 AK。请参考配置方式正确填写配置。

invalid SASL/PLAIN request: empty SecretKey

未配置 SK。请参考配置方式正确填写配置。

invalid SASL/PLAIN request: Invalid projectId

指定的 Project ID 不存在。请检查 Project ID 是否输入正确。

Not allow consume, please open kafka consumer

指定 Topic 没有开启消费功能。请参考开启 Kafka 消费功能操作。

Access Denied. You are not authorized to perform this operation.

当前用户不具备操作权限。建议检查用户权限。

示例代码

通过 Kafka Java SDK 消费日志

通过简单的参数配置,即可使用各类 Kafka Consumer SDK 消费日志服务中已采集到的数据。通过Kafka Java SDK 消费日志的相关依赖及示例代码如下:

  1. 添加依赖。
    在 pom 文件中添加 kafka-clients 的相关依赖。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
    </dependency>
    
  2. 消费日志。
    参考以下示例代码通过 Kafka Java SDK 消费日志。

    package org.kafka;
    
    import java.util.*;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    
    public class KafkaConsumeTest implements Runnable {
        
        private final KafkaConsumer<String, String> consumer;
        private ConsumerRecords<String, String> msgList;
        private final String topic;
        private static final String GROUPID = "MY_GROUP_ID";
        private KafkaAdminClient adminClient;
        private String consumeName;
        
        public KafkaConsumeTest(String topicName,String consumeName) {
            // consumeName
            this.consumeName = consumeName;
            // projectid
            String userName = "${projectId}";
            // 火山引擎账号的密钥,或具备对应权限的子账号密钥。不支持STS临时安全令牌。
            String passWord = "${access-key-id}#${access-key-secret}";
            Properties props = new Properties();
            props.put("bootstrap.servers", "${hosts}"); //消费的地址,具体见文档
            props.put("group.id", GROUPID);//消费组名称,长度为 1~256 个字符。
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG,
                    "org.apache.kafka.common.security.plain.PlainLoginModule " +
                            "required username=\"" + userName + "\" password=\"" + passWord + "\";");
            
            this.consumer = new KafkaConsumer<String, String>(props);
            this.topic = topicName;
            this.consumer.subscribe(Arrays.asList(topic));
            this.adminClient = (KafkaAdminClient) KafkaAdminClient.create(props);
        }
        
        @Override
        public void run() {
            int messageNo = 1;
            System.out.println("---------开始消费---------");
            try {
                while(true) {
                    msgList = consumer.poll(10);
                    
                    if(null!=msgList&&msgList.count()>0){
                        for (ConsumerRecord<String, String> record : msgList) {
                            System.out.println(this.consumeName+"==="+messageNo+" offset==="+record.offset()+"=======receive: key = " + record.key() + ", value = " +
                                    record.value());
                            messageNo++;
                        }
                    } else{
                        Thread.sleep(10);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
        
        public static void main(String args[]) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //${out-TopicID}为Kafka协议消费主题ID,格式为out+日志主题ID,例如"out-0fdaa6b6-3c9f-424c-8664-fc0d222c****"。 
            KafkaConsumeTest test1 = new KafkaConsumeTest("${out-TopicID}","Consume1");
            Thread thread1 = new Thread(test1);
            thread1.start();
            countDownLatch.await();
        }
    }
    

通过 Kafka Go SDK 消费日志

通过简单的参数配置,即可使用各类 Kafka Consumer SDK 消费日志服务中已采集到的数据。通过Kafka Go SDK 消费日志的相关依赖及示例代码如下:

  1. 执行以下命令安装 Sarama。

    go get github.com/Shopify/sarama@v1.38.1
    
  2. 导入 Sarama 等必要的依赖包,并消费日志。
    参考以下示例代码通过 Kafka Go SDK 消费日志。

    package kafka_consume_example
    
    import (
       "context"
       "fmt"
    
       "github.com/Shopify/sarama"
    )
    
    type exampleConsumerGroupHandler struct{}
    
    func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
    func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
    func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
       for msg := range claim.Messages() {
          fmt.Printf("Message topic:%q partition:%d offset:%d message:%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
          sess.MarkMessage(msg, "")
          sess.Commit()
       }
       return nil
    }
    
    func ExampleConsumerGroup() {
       config := sarama.NewConfig()
       config.Version = sarama.V2_0_0_0 // specify appropriate version
       config.ApiVersionsRequest = true
       config.Consumer.Return.Errors = true
       config.Net.SASL.Mechanism = "PLAIN"
       config.Net.SASL.Version = int16(0)
       config.Net.SASL.Enable = true
       config.Net.TLS.Enable = true
       config.Consumer.Offsets.Initial = sarama.OffsetNewest
       // projectId
       config.Net.SASL.User = "${projectId}"
       // 火山引擎账号的密钥,或具备对应权限的子账号密钥。不支持STS临时安全令牌。
       config.Net.SASL.Password = "${access-key-id}#${access-key-secret}"
       // host为消费的地址,具体见文档;my-group 为消费组名称,请根据实际值替换,长度为 1~256 个字符。
       group, err := sarama.NewConsumerGroup([]string{"${hosts}"}, "my-group", config)
       if err != nil {
          panic(err)
       }
       defer func() { _ = group.Close() }()
    
       // Track errors
       go func() {
          for err := range group.Errors() {
             fmt.Println("ERROR", err)
          }
       }()
    
       // Iterate over consumer sessions.
       ctx := context.Background()
       for {
          // ${out-TopicID}为Kafka协议消费主题ID,格式为out+日志主题ID,例如"out-0fdaa6b6-3c9f-424c-8664-fc0d222c****"。
          topics := []string{"${out-TopicID}"}
          handler := exampleConsumerGroupHandler{}
    
          // `Consume` should be called inside an infinite loop, when a
          // server-side rebalance happens, the consumer session will need to be
          // recreated to get the new claims
          err := group.Consume(ctx, topics, handler)
          if err != nil {
             panic(err)
          }
       }
    }
    

相关操作

查看消费组

说明

邀测功能,若有业务需求可联系客户经理申请白名单。

通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费后,您可以在日志服务控制台上查看消费组信息。

  1. 登录日志服务控制台
  2. 在顶部导航栏中,选择日志服务所在的地域。
  3. 在左侧导航栏中,选择数据处理 > 日志消费
  4. Kafka 页签下,找到并展开目标 Kafka 消费组所属的日志项目,然后单击指定的 Kafka 消费组。
  5. 查看目标消费组对应的消费信息。
    Image

重置消费位点

说明

邀测功能,若有业务需求可联系客户经理申请白名单。

通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费后,您可以在日志服务控制台上重置消费位点。您还可以通过重置消费位点实现历史数据的消费。

  1. 登录日志服务控制台
  2. 在顶部导航栏中,选择日志服务所在的地域。
  3. 在左侧导航栏中,选择数据处理 > 日志消费
  4. Kafka 页签下,找到并展开目标 Kafka 消费组所属的日志项目,然后单击指定的 Kafka 消费组。
  5. 重置消费位点。
    1. 日志主题列表页签中的右上角单击重置消费位点,或者在指定 Shard 对应的操作列单击重置消费位点

    2. 选择重置位置。

      重置位置

      说明

      最早位置

      从日志主题上最早的一条数据开始消费。

      最新位置

      跳过所有历史数据,直接从日志主题上最近写入的一条数据开始消费。

      指定时间点

      从过去某个指定时间点开始消费,可选的时间范围取决于日志主题的日志存储时长。

    3. 单击确定

查看监控看板

说明

邀测功能,若有业务需求可联系客户经理申请白名单。

通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费后,您可以在日志服务控制台上查看消费组延迟和消费速度的变化趋势。

  1. 登录日志服务控制台
  2. 在顶部导航栏中,选择日志服务所在的地域。
  3. 在左侧导航栏中,选择数据处理 > 日志消费
  4. Kafka 页签下,找到并展开目标 Kafka 消费组所属的日志项目,然后单击指定的 Kafka 消费组。
  5. 监控看板页签中,查看消费相关的仪表盘。
    监控看板中展示了该 Kafka 消费组的消费延迟、消费速度以及相关的变化趋势。
    Image