日志服务提供 Kafka 协议消费功能,您可以使用 spark-streaming-kafka 组件对接日志服务,将日志服务中的数据消费到下游的大数据组件或数据仓库。
Spark Streaming 是构建在 Spark 上的实时计算框架,在 Spark 的基础上提供了可拓展、高吞吐、容错的流计算能力。Spark Streaming 可整合多种数据源,例如通过 spark-streaming-kafka 组件整合 Kafka,实现消费 Kafka 消息的能力。日志服务支持为指定的日志主题开启 Kafka 协议消费功能,开启后,Spark Streaming 可以将日志主题作为 Kafka 的 Topic 进行消费,例如消费到下游的大数据组件或者数据仓库,适用于流式计算或大数据存储场景。
通过 Spark Streaming 消费火山引擎日志服务的日志数据时,需要使用 Spark Streaming 提供的 spark-streaming-kafka-0-10。
在项目中添加 spark-streaming-kafka 相关的 Maven 依赖。依赖的详细信息如下:
说明
日志服务 Kafka 消费功能仅支持 0.11.0 及以上的 Kafka 协议版本,需要手动指定 Spark 的 Kafka client 版本为 0.11.0及以上。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <!--版本大于等于0.11.0即可 --> <version>1.0.0</version> </dependency>
参考以下示例代码完成 Spark input stream 的相关配置。详细的配置说明请参考 Spark 官方文档,参数说明请参考下表。
以下示例展示了如何构建 Spark input stream 来消费日志主题 “0fdaa6b6-3c9f-424c-8664-fc0d222c****
” 中的日志数据。
// 构建SparkStreaming上下文 SparkConf conf = new SparkConf().setAppName("TlsDemo").setMaster("local").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses((Class<?>[]) Arrays.asList(ConsumerRecord.class).toArray()); // 每隔5秒钟,sparkStreaming作业就会收集最近5秒内的数据源接收过来的数据 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // 构建kafka参数map // 主要要放置的是连接的kafka集群的地址(broker集群的地址列表) Map<String, Object> kafkaParams = new HashMap<>(); //初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093,其中: //服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址。 //端口号固定为 9093。 kafkaParams.put("bootstrap.servers", tlsEndConsumePoint); //指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8) kafkaParams.put("key.deserializer", StringDeserializer.class); //指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8) kafkaParams.put("value.deserializer", StringDeserializer.class); //消费组ID,随意指定 kafkaParams.put("group.id", consumeGroupID); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); //安全模式,tls只支持SASL_SSL kafkaParams.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); //鉴权模式,tls只支持PLAIN kafkaParams.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); //账号密码设置: //用户名:应配置为日志服务的日志项目ID //密码:应配置为火山引擎账户密钥 // 格式为 ${access-key-id}#${access-key-secret},其中: //${access-key-id} 应替换为您的 AccessKey ID。 //${access-key-secret} 应替换为您的 AccessKey Secret。 kafkaParams.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule " + "required username=\"" + bd4d5b20-479a-4f33-9099-a44db3be**** + "\" password=\"" + AK****NWE2MTI#WldWa1p****dZd05JKJKHHUI== + "\";"); // 构建topic set Collection<String> topics = new HashSet<>(); //要消费的topic,格式为out-xxxxxxxxx topics.add(out-0fdaa6b6-3c9f-424c-8664-fc0d222c****); try { // 获取kafka的数据 final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams)); // todo 消费到下游大数据组件 stream.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } catch (Exception e) { e.printStackTrace(); }
其中,必选的设置如下:
配置项 | 说明 |
---|---|
bootstrap.servers | 初始连接的集群地址。格式为
|
consumeTopic | 要订阅的 Kafka Topic。此处应配置为日志服务的 Kafka 协议消费主题 ID,格式为 |
SECURITY_PROTOCOL_CONFIG | 安全模式,应指定为 SASL_SSL。 |
SASL_MECHANISM | 鉴权机制,指定为 PLAIN。 |
SASL_JAAS_CONFIG | 当前登录用户的鉴权信息。其中:
|