You need to enable JavaScript to run this app.
导航
通过 Flink 消费日志
最近更新时间:2025.01.06 16:08:32首次发布时间:2023.02.13 18:01:42

日志服务提供 Kafka 协议消费功能,您可以使用 Flink 的 flink-connector-kafka 插件对接日志服务,将日志服务中的数据消费到下游的大数据组件或者数据仓库。

场景概述

Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 提供了 Apache Kafka 连接器(flink-connector-kafka)在 Kafka topic 中读取和写入数据。日志服务支持为指定的日志主题开启 Kafka 协议消费功能,开启后,Flink 可以将日志主题作为 Kafka 的 Topic 进行消费,例如消费到下游的大数据组件或者数据仓库,适用于流式计算或大数据存储场景。

前提条件

  • 已创建日志项目和日志主题。详细操作步骤请参考创建资源
  • 已为指定日志主题开启 Kafka 协议消费功能,并获取Kafka协议消费主题ID。详细说明请参考通过 Kafka 协议消费日志
  • 推荐使用 IAM 用户进行访问鉴权。使用 IAM 用户前,需确认火山引擎主账号已创建 IAM 用户,且已为其授予消费相关的权限。详细说明请参考可授予的权限
  • 已获取当前登录账号的密钥 Access Key。详细信息请参考创建密钥

注意事项

Apache Flink 内置了多个 Kafka client,不同 Flink 发行版之间其使用的客户端版本可能会发生改变。目前日志服务仅支持 0.11.x 及以上的 Kafka client 连接,对应的 Flink 版本为 1.7.x 及以上,如果使用 1.7.x 以下的 Flink,需要手动指定 Kafka client的版本为 0.11.x 及以上。

配置步骤

1 添加 Maven 依赖

通过 Flink 消费火山引擎日志服务的日志数据时,需要使用 Flink 提供的 flink-connector-kafka 作为 Connctor。
在项目中添加 flink-connector-kafka 相关的 Maven 依赖。
依赖的详细信息如下:

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>1.16.0</version>
</dependency>

Kafka Source 提供了构建类来创建 KafkaSource 的实例。其使用方法和实现细节请参考 Flink
在构建 KafkaSource 时必须通过以下方法指定基础属性。

方法

说明

setBootstrapServers()

Bootstrap server,应配置为初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093。

  • 服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务地址,详细信息请参见服务地址
  • 端口号固定为 9093。

setTopics()

要订阅的 Kafka Topic。此处应配置为日志服务的 Kafka 协议消费主题 ID,格式为 out-日志主题ID,例如 out-0fdaa6b6-3c9f-424c-8664-fc0d222c****。 您可以在日志服务控制台的 Topic 详情页中查看并复制 Kafka 协议消费主题 ID
Image

setGroupId()

消费者组 ID。

setValueOnlyDeserializer()

用于解析 Kafka 消息的反序列化器(Deserializer),详细信息请参考消息解析

setProperty()

安全模式、授权模式等设置。应指定以下配置:

  • CommonClientConfigs.SECURITY_PROTOCOL_CONFIG指定为SASL_SSL
  • SaslConfigs.SASL_MECHANISM指定为PLAIN
  • SaslConfigs.SASL_JAAS_CONFIG中配置当前登录用户的鉴权信息。
    • useranme:Kafka SASL 用户名。应配置为日志服务的日志项目 ID。
    • password:Kafka SASL 用户密码。应配置为火山引擎账户密钥。格式为 ${access-key-id}#${access-key-secret},其中:
      • ${access-key-id} 应替换为您的 AccessKey ID。
      • ${access-key-secret} 应替换为您的 AccessKey Secret。
        详细配置示例请参考以下示例代码。

以下代码片段展示了如何构建 KafkaSource 来消费日志主题 “0fdaa6b6-3c9f-424c-8664-fc0d222c****” 中的日志数据。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Consuming data from TLS kafka
KafkaSource<String> tlsKafkaSource = KafkaSource.<String>builder()
        //初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093,其中:
        //服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址。
        //端口号固定为 9093。
        .setBootstrapServers(tls-cn-beijing.ivolces.com:9093)
        //要消费的topic,格式为out-xxxxxxxxx
        .setTopics(out-0fdaa6b6-3c9f-424c-8664-fc0d222c****)
        //消费组配置
        .setGroupId(consumeGroupID)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        //安全模式,tls只支持SASL_SSL
        .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL")
        //鉴权模式,tls只支持PLAIN
        .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN")
        //账号密码设置:
        //username:用户名,应配置为日志服务的日志项目ID
        //password:密码,应配置为火山引擎账户密钥,格式为 ${access-key-id}#${access-key-secret}。其中:
        //${access-key-id} 应替换为您的 AccessKey ID。
        //${access-key-secret} 应替换为您的 AccessKey Secret。
        .setProperty(SaslConfigs.SASL_JAAS_CONFIG,
                "org.apache.kafka.common.security.plain.PlainLoginModule " +
                        "required username=\"" + bd4d5b20-479a-4f33-9099-a44db3be**** + "\" password=\"" + AK****NWE2MTI#WldWa1p****dZd05JKJKHHUI== + "\";")
        .build();
DataStreamSource<String> data = env.fromSource(tlsKafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

// todo 自定义sink,需要用户自己实现自己的sink
Sink<String> customizedSink = CreateSink();
data.sinkTo(customizedSink);

env.execute("Flink Tls Kafka Job");