You need to enable JavaScript to run this app.
导航
通过 Flink 传输数据
最近更新时间:2024.07.01 14:57:07首次发布时间:2024.06.28 15:52:14

日志服务全面集成 Flink 生态体系,不仅支持接收由 Flink connector 写入的日志,还支持通过 flink-connector-kafka 将日志消费到下游的大数据组件中。

背景信息

Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架,具有丰富的连接器生态,例如 Flink connector 和 Apache Kafka 连接器(flink-connector-kafka 插件)。
日志服务全面集成 Flink 生态体系,支持通过 Flink connector 轻松接入不同源头的各类日志,实现日志的实时采集、深度分析与结构化处理等操作;支持通过 flink-connector-kafka 插件无缝对接大数据处理与分析平台,将处理后的日志消费到下游的大数据组件或者数据仓库。

前提条件

  • 已创建日志项目和日志主题。详细操作步骤请参考创建资源
  • 已为指定日志主题开启 Kafka 协议消费功能,并获取Kafka协议消费主题ID。详细说明请参考通过 Kafka 协议消费日志
  • 推荐使用 IAM 用户进行访问鉴权。使用 IAM 用户前,需确认火山引擎主账号已创建 IAM 用户,且已为其授予消费相关的权限。详细说明请参考可授予的权限
  • 已获取当前登录账号的密钥 Access Key。详细信息请参考创建密钥

说明

目前只支持 Flink at least once 语义,在任务失败时,写入日志服务中的数据有可能会重复,但不会丢失。

操作步骤

  1. 初始化 FlinkLogSInk。

    1. 设置生产参数。
      一般情况下使用默认值即可,如有需要可以自定义配置。配置示例,请参考配置示例

      配置项

      是否必选

      默认值

      描述

      endpint

      /

      日志服务的服务地址,例如:tls-cn-beijing.volces.com。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址

      说明

      服务地址无需指定 https://

      region

      /

      日志项目所在地域,例如:cn-beijing。

      accessKey

      /

      火山引擎账号或 IAM 账号的 Access Key ID。您可以在火山引擎控制台密钥管理页面,获取 Access Key ID。
      推荐使用 IAM 用户进行鉴权,并将其 Access Key ID 配置在环境变量中。

      AccessSecret

      /

      火山引擎主账号或 IAM 用户的 Secret Access Key。您可以在火山引擎控制台密钥管理页面,获取 Secret Access Key。
      推荐使用 IAM 用户进行鉴权,并将其 Secret Access Key 配置在环境变量中。

      totalSizeInBytes

      104857600

      日志服务最大可用的缓存发送日志内存大小。

      maxThreadCount

      50

      日志服务发送日志最大可用线程数。

      maxBatchSizeBytes

      524288

      单个请求发送的日志大小,最大不超过 10MB。

      maxBatchCount

      4096

      单个请求发送的日志条数,最大不超过 40960。

      lingerMs

      2000

      单个请求的最长等待时间,单位:毫秒。

      maxBlockMs

      60000L

      申请可用内存最长等待时间,单位:毫秒

      retryCount

      2

      发送失败可重试的次数,最大不超过 4 次。

      maxReservedAttempts

      3

      可保留的请求记录数,最大不超过 5。

    2. 重载 LogSerializationFunction 函数,将数据序列化为 FlinkLogGroup。

      /** SimpleStringFunction. */
      public static class SimpleStringFunction implements LogSerializationFunction<String> {
      
          private final String topic;
      
          public SimpleStringFunction(String topic) {
              this.topic = topic;
          }
      
          @Override
          public FlinkLogGroup serialize(String element) {
              FlinkLogGroup group = new FlinkLogGroup();
              group.setTopic(topic);
              LogItem log = new LogItem();
              log.setTime(System.currentTimeMillis());
              log.addContent("content", element);
              group.getLogs().add(log);
              return group;
          }
      }
      
  2. 模拟数据写入日志服务。

    /** 模拟生产消息。 */
    public static class RandomStringSource implements ParallelSourceFunction<String> {
        private volatile boolean running = true;
    
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" getLongString(30));
            }
        }
    
        @Override
        public void cancel() {
            running = false;
        }
    
        public static String getLongString(int length) {
            String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
            Random random = new Random();
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < length; i++) {
                int number = random.nextInt(62);
                sb.append(str.charAt(number));
            }
            return sb.toString();
        }
    }
    

配置示例

您可以参考如下示例完成日志写入。

package org.apache.flink.connectors.tls;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connectors.tls.config.LogConstants;
import org.apache.flink.connectors.tls.entry.FlinkLogGroup;
import org.apache.flink.connectors.tls.entry.LogSerializationFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import com.volcengine.model.tls.LogItem;

import java.util.Random;

/** TLS sink sample. */
public class FlinkLogSinkSample {

    private static final String DEFAULT_TLS_ENDPOINT = "http://tls-cn-guilin-boe.volces.com";
    private static final String DEFAULT_REGION = "cn-guilin-boe";
    private static final String ACCESS_KEY_ID = "";
    private static final String ACCESS_KEY_SECRET = "";

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(3);
        env.enableCheckpointing(5000);
        DataStream<String> stream = env.addSource(new RandomStringSource());

        // 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey
        // 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455
        // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空
        // FlinkLogSink初始化时配置有默认值,可根据实际情况调整
        FlinkLogSink<String> flinkLogSink =
                new FlinkLogSink.Builder<String>(
                                params.get(LogConstants.LOG_ENDPOINT, DEFAULT_TLS_ENDPOINT),
                                params.get(LogConstants.LOG_REGION, DEFAULT_REGION),
                                params.get(LogConstants.LOG_ACCESS_KEY, ACCESS_KEY_ID),
                                params.get(LogConstants.LOG_ACCESS_SECRET, ACCESS_KEY_SECRET))
                        .setSerializationFunction(new SimpleStringFunction(params.get("TOPIC")))
                        .build();
        stream.addSink(flinkLogSink);
        env.execute("flink log producer");
    }

    /** 模拟生产消息. */
    public static class RandomStringSource implements ParallelSourceFunction<String> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" getLongString(30));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }

        public static String getLongString(int length) {
            String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
            Random random = new Random();
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < length; i++) {
                int number = random.nextInt(62);
                sb.append(str.charAt(number));
            }
            return sb.toString();
        }
    }

    /** SimpleStringFunction. */
    public static class SimpleStringFunction implements LogSerializationFunction<String> {

        private final String topic;

        public SimpleStringFunction(String topic) {
            this.topic = topic;
        }

        @Override
        public FlinkLogGroup serialize(String element) {
            FlinkLogGroup group = new FlinkLogGroup();
            group.setTopic(topic);
            LogItem log = new LogItem();
            log.setTime(System.currentTimeMillis());
            log.addContent("content", element);
            group.getLogs().add(log);
            return group;
        }
    }
}

通过 Flink 消费火山引擎日志服务的日志数据时,需要使用 Flink 提供的 flink-connector-kafka 作为 Connctor。

说明

Apache Flink 内置了多个 Kafka client,不同 Flink 发行版之间其使用的客户端版本可能会发生改变。目前日志服务仅支持 0.11.x 及以上的 Kafka client 连接,对应的 Flink 版本为 1.7.x 及以上,如果使用 1.7.x 以下的 Flink,需要手动指定 Kafka client的版本为 0.11.x 及以上。

1 添加 Maven 依赖

在项目中添加 flink-connector-kafka 相关的 Maven 依赖。依赖的详细信息如下:

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>1.16.0</version>
</dependency>

Kafka Source 提供了构建类来创建 KafkaSource 的实例。其使用方法和实现细节请参考 Flink 官方文档
在构建 KafkaSource 时必须通过以下方法指定基础属性。

方法

说明

setBootstrapServers()

Bootstrap server,应配置为初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093。

  • 服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务地址,详细信息请参见服务地址
  • 端口号固定为 9093。

setTopics()

要订阅的 Kafka Topic。此处应配置为日志服务的 Kafka 协议消费主题 ID,格式为 out-日志主题ID,例如 out-0fdaa6b6-3c9f-424c-8664-fc0d222c****。 您可以在日志服务控制台的 Topic 详情页中查看并复制 Kafka 协议消费主题 ID
图片

setGroupId()

消费者组 ID。

setValueOnlyDeserializer()

用于解析 Kafka 消息的反序列化器(Deserializer),详细信息请参考消息解析

setProperty()

安全模式、授权模式等设置。应指定以下配置:

  • CommonClientConfigs.SECURITY_PROTOCOL_CONFIG指定为SASL_SSL
  • SaslConfigs.SASL_MECHANISM指定为PLAIN
  • SaslConfigs.SASL_JAAS_CONFIG中配置当前登录用户的鉴权信息。
    • useranme:Kafka SASL 用户名。应配置为日志服务的日志项目 ID。
    • password:Kafka SASL 用户密码。应配置为火山引擎账户密钥。格式为 ${access-key-id}#${access-key-secret},其中:
      • ${access-key-id} 应替换为您的 AccessKey ID。
      • ${access-key-secret} 应替换为您的 AccessKey Secret。
        详细配置示例请参考以下示例代码。

以下代码片段展示了如何构建 KafkaSource 来消费日志主题 0fdaa6b6-3c9f-424c-8664-fc0d222c**** 中的日志数据。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Consuming data from TLS kafka
KafkaSource<String> tlsKafkaSource = KafkaSource.<String>builder()
        //初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093,其中:
        //服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址。
        //端口号固定为 9093。
        .setBootstrapServers(tls-cn-beijing.ivolces.com:9093)
        //要消费的topic,格式为out-xxxxxxxxx
        .setTopics(out-0fdaa6b6-3c9f-424c-8664-fc0d222c****)
        //消费组配置
        .setGroupId(consumeGroupID)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        //安全模式,tls只支持SASL_SSL
        .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL")
        //鉴权模式,tls只支持PLAIN
        .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN")
        //账号密码设置:
        //username:用户名,应配置为日志服务的日志项目ID
        //password:密码,应配置为火山引擎账户密钥,格式为 ${access-key-id}#${access-key-secret}。其中:
        //${access-key-id} 应替换为您的 AccessKey ID。
        //${access-key-secret} 应替换为您的 AccessKey Secret。
        .setProperty(SaslConfigs.SASL_JAAS_CONFIG,
                "org.apache.kafka.common.security.plain.PlainLoginModule " +
                        "required username=\"" + bd4d5b20-479a-4f33-9099-a44db3be**** + "\" password=\"" + AK****NWE2MTI#WldWa1p****dZd05JKJKHHUI== + "\";")
        .build();
DataStreamSource<String> data = env.fromSource(tlsKafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

// todo 自定义sink,需要用户自己实现自己的sink。
Sink<String> customizedSink = CreateSink();
data.sinkTo(customizedSink);

env.execute("Flink Tls Kafka Job");