You need to enable JavaScript to run this app.
导航
通过 Spark Streaming 消费日志
最近更新时间:2024.07.01 15:03:07首次发布时间:2023.02.13 18:01:42

日志服务提供 Kafka 协议消费功能,您可以使用 spark-streaming-kafka 组件对接日志服务,将日志服务中的数据消费到下游的大数据组件或数据仓库。

场景概述

Spark Streaming 是构建在 Spark 上的实时计算框架,在 Spark 的基础上提供了可拓展、高吞吐、容错的流计算能力。Spark Streaming 可整合多种数据源,例如通过 spark-streaming-kafka 组件整合 Kafka,实现消费 Kafka 消息的能力。日志服务支持为指定的日志主题开启 Kafka 协议消费功能,开启后,Spark Streaming 可以将日志主题作为 Kafka 的 Topic 进行消费,例如消费到下游的大数据组件或者数据仓库,适用于流式计算或大数据存储场景。

前提条件

  • 已创建日志项目和日志主题。详细操作步骤请参考创建资源
  • 已为指定日志主题开启 Kafka 协议消费功能,并获取Kafka协议消费主题ID。详细说明请参考通过 Kafka 协议消费日志
  • 推荐使用 IAM 用户进行访问鉴权。使用 IAM 用户前,需确认火山引擎主账号已创建 IAM 用户,且已为其授予消费相关的权限。详细说明请参考可授予的权限
  • 已获取当前登录账号的密钥 Access Key。详细信息请参考创建密钥

配置步骤

1 添加 Maven 依赖

通过 Spark Streaming 消费火山引擎日志服务的日志数据时,需要使用 Spark Streaming 提供的 spark-streaming-kafka-0-10。
在项目中添加 spark-streaming-kafka 相关的 Maven 依赖。依赖的详细信息如下:

说明

日志服务 Kafka 消费功能仅支持 0.11.0 及以上的 Kafka 协议版本,需要手动指定 Spark 的 Kafka client 版本为 0.11.0及以上。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <!--版本大于等于0.11.0即可 -->
    <version>1.0.0</version>
</dependency>

2 配置 Spark input stream

参考以下示例代码完成 Spark input stream 的相关配置。详细的配置说明请参考 Spark 官方文档,参数说明请参考下表。
以下示例展示了如何构建 Spark input stream 来消费日志主题 “0fdaa6b6-3c9f-424c-8664-fc0d222c****” 中的日志数据。

// 构建SparkStreaming上下文
SparkConf conf = new SparkConf().setAppName("TlsDemo").setMaster("local").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses((Class<?>[]) Arrays.asList(ConsumerRecord.class).toArray());

// 每隔5秒钟,sparkStreaming作业就会收集最近5秒内的数据源接收过来的数据
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

// 构建kafka参数map
// 主要要放置的是连接的kafka集群的地址(broker集群的地址列表)
Map<String, Object> kafkaParams = new HashMap<>();
//初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093,其中:
//服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址。
//端口号固定为 9093。
kafkaParams.put("bootstrap.servers", tlsEndConsumePoint);
//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("key.deserializer", StringDeserializer.class);
//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("value.deserializer", StringDeserializer.class);
//消费组ID,随意指定
kafkaParams.put("group.id", consumeGroupID);
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
//安全模式,tls只支持SASL_SSL
kafkaParams.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//鉴权模式,tls只支持PLAIN
kafkaParams.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//账号密码设置:
//用户名:应配置为日志服务的日志项目ID
//密码:应配置为火山引擎账户密钥
// 格式为 ${access-key-id}#${access-key-secret},其中:
//${access-key-id} 应替换为您的 AccessKey ID。
//${access-key-secret} 应替换为您的 AccessKey Secret。
kafkaParams.put(SaslConfigs.SASL_JAAS_CONFIG,
        "org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username=\"" + bd4d5b20-479a-4f33-9099-a44db3be**** + "\" password=\"" + AK****NWE2MTI#WldWa1p****dZd05JKJKHHUI== + "\";");

// 构建topic set
Collection<String> topics = new HashSet<>();
//要消费的topic,格式为out-xxxxxxxxx
topics.add(out-0fdaa6b6-3c9f-424c-8664-fc0d222c****);

try {
    // 获取kafka的数据
    final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe(topics, kafkaParams));
            
    // todo 消费到下游大数据组件
    stream.print();
    jssc.start();
    jssc.awaitTermination();
    jssc.close();

} catch (Exception e) {
    e.printStackTrace();
}

其中,必选的设置如下:

配置项

说明

bootstrap.servers

初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093。

  • 服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务地址,详细信息请参见服务地址
  • 端口号固定为 9093。

consumeTopic

要订阅的 Kafka Topic。此处应配置为日志服务的 Kafka 协议消费主题 ID,格式为 out-日志主题ID,例如 out-0fdaa6b6-3c9f-424c-8664-fc0d222c****。 您可以在日志服务控制台的 Topic 详情页中查看并复制 Kafka 协议消费主题 ID

SECURITY_PROTOCOL_CONFIG

安全模式,应指定为 SASL_SSL。

SASL_MECHANISM

鉴权机制,指定为 PLAIN。

SASL_JAAS_CONFIG

当前登录用户的鉴权信息。其中:

  • useranme:Kafka SASL 用户名。应配置为日志服务的日志项目 ID。
  • password:Kafka SASL 用户密码。应配置为火山引擎账户密钥。格式为 ${access-key-id}#${access-key-secret},其中:
    • ${access-key-id} 应替换为您的 AccessKey ID。
    • ${access-key-secret} 应替换为您的 AccessKey Secret。