日志服务提供 Kafka 协议消费功能,您可以使用 Flink 的 flink-connector-kafka 插件对接日志服务,将日志服务中的数据消费到下游的大数据组件或者数据仓库。
Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 提供了 Apache Kafka 连接器(flink-connector-kafka)在 Kafka topic 中读取和写入数据。日志服务支持为指定的日志主题开启 Kafka 协议消费功能,开启后,Flink 可以将日志主题作为 Kafka 的 Topic 进行消费,例如消费到下游的大数据组件或者数据仓库,适用于流式计算或大数据存储场景。
Apache Flink 内置了多个 Kafka client,不同 Flink 发行版之间其使用的客户端版本可能会发生改变。目前日志服务仅支持 0.11.x 及以上的 Kafka client 连接,对应的 Flink 版本为 1.7.x 及以上,如果使用 1.7.x 以下的 Flink,需要手动指定 Kafka client的版本为 0.11.x 及以上。
通过 Flink 消费火山引擎日志服务的日志数据时,需要使用 Flink 提供的 flink-connector-kafka 作为 Connctor。
在项目中添加 flink-connector-kafka 相关的 Maven 依赖。
依赖的详细信息如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency>
Kafka Source 提供了构建类来创建 KafkaSource
的实例。其使用方法和实现细节请参考 Flink。
在构建 KafkaSource 时必须通过以下方法指定基础属性。
方法 | 说明 |
---|---|
setBootstrapServers() | Bootstrap server,应配置为初始连接的集群地址。格式为
|
setTopics() | 要订阅的 Kafka Topic。此处应配置为日志服务的 Kafka 协议消费主题 ID,格式为 |
setGroupId() | 消费者组 ID。 |
setValueOnlyDeserializer() | 用于解析 Kafka 消息的反序列化器(Deserializer),详细信息请参考消息解析。 |
setProperty() | 安全模式、授权模式等设置。应指定以下配置:
|
以下代码片段展示了如何构建 KafkaSource 来消费日志主题 “0fdaa6b6-3c9f-424c-8664-fc0d222c****
” 中的日志数据。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Consuming data from TLS kafka KafkaSource<String> tlsKafkaSource = KafkaSource.<String>builder() //初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093,其中: //服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址。 //端口号固定为 9093。 .setBootstrapServers(tls-cn-beijing.ivolces.com:9093) //要消费的topic,格式为out-xxxxxxxxx .setTopics(out-0fdaa6b6-3c9f-424c-8664-fc0d222c****) //消费组配置 .setGroupId(consumeGroupID) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) //安全模式,tls只支持SASL_SSL .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL") //鉴权模式,tls只支持PLAIN .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") //账号密码设置: //username:用户名,应配置为日志服务的日志项目ID //password:密码,应配置为火山引擎账户密钥,格式为 ${access-key-id}#${access-key-secret}。其中: //${access-key-id} 应替换为您的 AccessKey ID。 //${access-key-secret} 应替换为您的 AccessKey Secret。 .setProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule " + "required username=\"" + bd4d5b20-479a-4f33-9099-a44db3be**** + "\" password=\"" + AK****NWE2MTI#WldWa1p****dZd05JKJKHHUI== + "\";") .build(); DataStreamSource<String> data = env.fromSource(tlsKafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source"); // todo 自定义sink,需要用户自己实现自己的sink Sink<String> customizedSink = CreateSink(); data.sinkTo(customizedSink); env.execute("Flink Tls Kafka Job");