You need to enable JavaScript to run this app.
导航
Flink 使用 Proton
最近更新时间:2024.08.02 15:57:02首次发布时间:2023.05.17 15:03:57

开源版本 Flink 不支持以 EXACTLY_ONCE 语义流式写入对象存储服务(TOS)存储,当有类似需求时,需要结合 Proton SDK 进行数据写入。从火山引擎 E-MapReduce(EMR)3.2.1 版本开始,火山 EMR Flink 已经默认在运行环境中集成了 Proton SDK,您可以开箱使用 Flink 读写 TOS 的功能,针对已有的历史集群,需要下载 Proton SDK,并且做一些额外配置,才能正常使用,可参考 Proton 发行版本,手动下载 Proton SDK。

1 火山 EMR

1.1 集群配置

火山EMR集群自 3.2.1 版本之后已经默认集成了 Proton 的相关依赖,包括 Hadoop 数据湖类型集群和 Flink 实时计算类型集群,无需额外配置。
针对 3.2.1 版本之前的存量集群,如果想要添加或者升级 Flink Proton 依赖包,在下载 Proton SDK 后, 将 Proton SDK 拷贝到每个 EMR 节点, 解压之后:

  • plugins/flink/proton-flink{flink.version}-bundle-{proton.version}.jar, 比如plugins/flink/proton-flink1.16-bundle-2.0.1.jar,放到 flink lib 目录/usr/lib/emr/current/flink/lib/下,如果存在老版本的jar,请将其移除。
  • proton-hadoop${hadoop.major.version}-bundle-{proton.version}.jar,替换/usr/lib/emr/current/hadoop/share/hadoop/hdfs/下的proton-hadoop${hadoop.major.version}-bundle-{old.proton.version}.jar,拷贝时,请使用对应 Hadoop 系列的Jar包,比如 Hadoop2x 环境,请选择proton-hadoop2-bundle-{proton.version}.jar

1.2 认证配置

火山 EMR 已经默认配置好 TOS 和 IAM 认证信息,无需额外配置,如果需要自定义配置信息,可通过core-site.xml或者flink-conf.yaml进行配置。flink-conf.yaml配置可参考以下配置信息:

fs.tos.access-key-id: xxxx
fs.tos.secret-access-key: xxx
# 每个region设置对应的endpoint地址,如果运行在火山ECS,可以使用内网地址http://tos-cn-xxx.ivolces.com
fs.tos.endpoint: http://tos-cn-guangzhou.volces.com 

# 也可以针对每一个bucket进行认证信息设置
fs.tos.bucket.{bucketname}.access-key-id:xxx
fs.tos.bucket.{bucketname}.secret-access-key:xxx

如果需要用过 Filesystem connector 读取存储在 TOS 上的 Parquet 类型的数据时,由于当前 Flink ParquetVectorizedInputFormat获取配置信息的限制,需要将 TOS 以及 IAM 认证信息添加到core-site.xml中,才能正常读取。core-site.xml详细配置可参考 Hadoop 使用 Proton - 配置修改 章节。

<configuration>
   <property>
        <name>fs.tos.access-key-id</name>
        <value>{iam.role.access.key}</value>
   </property>
   <property>
        <name>fs.tos.secret-access-key</name>
        <value>{iam.role.access.val}</value>
    </property>
    <property>
        <name>fs.tos.impl</name>
        <value>io.proton.fs.RawFileSystem</value>
    </property>
    <property>
        <name>fs.tos.endpoint</name>
        <value>http://tos-cn-guangzhou.volces.com</value>
    </property>
</configuration>

2.1 集群配置

在自建 Hadoop + Flink 集群中,如果要实现 Flink 以 EXACTLY_ONCE 语义流式写入 TOS 存储,需要在下载 Proton SDK 之后,将proton-flink{flink.version}-bundle-{proton.version}.jar拷贝到 flink lib 中。

2.2 认证配置

参考上述火山EMR - 认证配置章节进行自定义配置。

3.1 集群配置

独立 Flink 集群和自建 Hadoop+Flink 集群类似,需要在下载 Proton SDK 之后,将proton-flink${flink.version}-bundle-${proton.version}.jar拷贝到 flink lib 目录下,然后在core-site.xml或者flink-conf.yaml中添加 IAM 和 TOS 的配置信息。

3.2 认证配置

参考上述火山EMR - 认证配置章节进行自定义配置。

4 自定义配置

4.1 熵注入(Entropy Injection)

proton-flink支持开启熵注入(Entropy Injection)功能,即使用一段固定长度的随机字符串,替换写入路径中的一段特定字符串,从而解决对象存储单分片QPS限额的问题,通过随机化部分路径,将对象分散到多个分片中,提升写入效率。
比如当写入的对象存储为TOS时,需在flink-conf.yaml中添加一下参数:

tos.entropy.key: <user-defined-key>
tos.entropy.length: <user-defined-length>

说明

目前 Flink 运行时仅对 checkpoint 数据文件使用熵注入选项。所有其他文件包括 checkpoint 元数据与外部 URI 都不使用熵注入,以保证 checkpoint URI 的可预测性。

4.2 启用Proton缓存加速模式

如果使用场景中使用了Proton的缓存加速模式,同时期望对Flink读写进行加速,可在flink-conf.yamlcore-site.xml添加以下参数,开启加速模式。

proton.cache.enable: true
proton.metaserver.addresses: <proton-metaserver-host:port>

说明

开启加速模式时,应结合实际使用场景,确认是否有必要使用缓存加速模式,比如应考虑数据的可重复利用性,元数据实时一致性等。如果Flink的使用场景仅仅是从外部数据源接入数据,然后保存到对象存储上,并且接入的数据,短时间内不会被下游系统消费,那么不建议Flink数据同步任务使用缓存加速模式。

4.3 Hive Connector exactly-once语义

默认情况下,对于流写任务,Flink仅支持通过rename的方式来实现写入操作,大部分对象存储并不支持rename操作,所以一般情况下,通过hive connector写入对象存储时,没法支持exactly-once语义。但是可以通过在flink-conf.yaml中修改以下参数,使用Flink原生的Parquet或者Orc writer,来支持exactly-once语义。(仅针对parquet和orc文件类型生效)

table.exec.hive.fallback-mapred-writer: false

5 使用样例

  1. 启动 SQL Client 客户端。

    说明

    使用 SQL Client 的时候访问 TOS 时,需要显示设置HADOOP_CLASSPATH

    export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
    /usr/lib/emr/current/flink/bin/sql-client.sh embedded
    
    set execution.target=yarn-per-job;
    
  2. 创建数据源。

    create table datagen (
        id          int,
        first_name  varchar,
        last_name   varchar,
        phone       bigint,
        address     varchar,
        company     int
    ) with (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '100000',
        'fields.first_name.length' = '5',
        'fields.last_name.length' = '5',
        'fields.phone.min' = '0000000',
        'fields.phone.min' = '9999999',
        'fields.address.length' = '7',
        'fields.company.min' = '10',
        'fields.company.max' = '20'
    );
    
  3. 写入数据。

    CREATE TABLE tos_parquet_user_sink_tbl (
        id          int,
        first_name  varchar,
        last_name   varchar,
        phone       bigint,
        address     varchar,
        company     int
    ) PARTITIONED BY (company) WITH (
     'connector' = 'filesystem',
     'path' = 'tos://{bucket}/xxxxx/flink/tos_parquet_user_tbl',
     'format' = 'parquet',
     'sink.rolling-policy.file-size' = '5MB',
     'sink.rolling-policy.rollover-interval' = '5min',
     'sink.rolling-policy.check-interval' = '1min',
     'parquet.compression'='SNAPPY' -- 如果无需压缩,可以不添加该配置
    );
    
    SET 'execution.checkpointing.interval' = '300s';
    
    INSERT INTO tos_parquet_user_sink_tbl SELECT * FROM datagen;
    

  1. 代码样例。

    1. pom.xml

      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-kafka-tos-demo</artifactId>
          <packaging>jar</packaging>
          <version>1.0-SNAPSHOT</version>
      
          <name>flink-kafka-tos-demo</name>
      
          <properties>
              <flink.version>1.16.1</flink.version>
              <hadoop.version>3.3.4</hadoop.version>
          </properties>
      
          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-core</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-common</artifactId>
                  <version>${hadoop.version}</version>
                  <scope>provided</scope>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kafka</artifactId>
                  <version>${flink.version}</version>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-parquet</artifactId>
                  <version>${flink.version}</version>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.parquet</groupId>
                  <artifactId>parquet-avro</artifactId>
                  <version>1.12.2</version>
                  <exclusions>
                      <exclusion>
                          <groupId>org.apache.hadoop</groupId>
                          <artifactId>hadoop-client</artifactId>
                      </exclusion>
                      <exclusion>
                          <groupId>it.unimi.dsi</groupId>
                          <artifactId>fastutil</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency>
          </dependencies>
      
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-shade-plugin</artifactId>
                      <version>3.2.1</version>
                      <executions>
                          <!-- Run shade goal on package phase -->
                          <execution>
                              <phase>package</phase>
                              <goals>
                                  <goal>shade</goal>
                              </goals>
                              <configuration>
                                  <transformers>
                                      <transformer
                                              implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                          <mainClass>org.apache.flink.KafkaToTosDemo</mainClass>
                                      </transformer>
                                  </transformers>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <configuration>
                          <source>8</source>
                          <target>8</target>
                      </configuration>
                  </plugin>
              </plugins>
          </build>
      </project>
      
    2. 业务逻辑。

      • KafkaToTosDemo.java

        /*
         * ByteDance Volcengine EMR, Copyright 2022.
         *
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         *
         *     http://www.apache.org/licenses/LICENSE-2.0
         *
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
         */
        
        package org.apache.flink;
        
        import org.apache.flink.api.common.eventtime.WatermarkStrategy;
        import org.apache.flink.api.common.serialization.SimpleStringSchema;
        import org.apache.flink.api.common.typeinfo.Types;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.connector.kafka.source.KafkaSource;
        import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
        import org.apache.flink.core.fs.Path;
        import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
        import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
        import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
        import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
        import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
        import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
        import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
        import org.apache.flink.streaming.api.windowing.time.Time;
        import org.apache.parquet.hadoop.metadata.CompressionCodecName;
        
        public class KafkaToTosDemo {
          public static void main(String[] args) throws Exception {
            ParameterTool pt = ParameterTool.fromArgs(args);
        
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
            String outputPath = pt.get("tos.output.path");
            String topic = pt.getRequired("kafka.topic");
            String consumerGroup = pt.get("kafka.consumer.group.id", "kafka-to-tos-demo-group");
            String bootstrapServers = pt.getRequired("kafka.bootstrap.servers");
            String checkpointPath = pt.getRequired("checkpoint.path");
            long checkpointInterval = pt.getLong("checkpoint.interval", 10_000L);
        
            env.getCheckpointConfig().setCheckpointStorage(checkpointPath);
            env.getCheckpointConfig().setCheckpointInterval(checkpointInterval);
        
            ObjectMapper jsonParser = new ObjectMapper();
        
            env.fromSource(createKafkaSource(topic, bootstrapServers, consumerGroup),
                    WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
                .map(data -> {
                  JsonNode jsonNode = jsonParser.readValue(data, JsonNode.class);
                  return new Tuple2<>(jsonNode.get("ticker").toString(), 1);
                }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(v -> v.f0)
                // .timeWindow(Time.minutes(1)) // Tumbling window definition // Flink 1.11
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // since Flink 1.13
                .sum(1) // Count the appearances by ticker per partition
                .map(t -> new TickCount(t.f0, t.f1))
                .addSink(createTosSnappySinkFromStaticConfig(outputPath))
                .name("TOS Parquet Sink");
        
            env.execute("kafka-to-tos-demo");
          }
        
          private static KafkaSource<String> createKafkaSource(String topic, String bootstrapServers, String consumerGroup) {
            return KafkaSource.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(topic)
                .setGroupId(consumerGroup)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
          }
        
          private static StreamingFileSink<TickCount> createTosSinkFromStaticConfig(String outputPath) {
            return StreamingFileSink
                .forBulkFormat(new Path(outputPath), AvroParquetWriters.forReflectRecord(TickCount.class))
                .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/"))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(OutputFileConfig.builder()
                    .withPartPrefix("complete")
                    .withPartSuffix(".parquet")
                    .build())
                .build();
          }
        
          private static StreamingFileSink<TickCount> createTosSnappySinkFromStaticConfig(String outputPath) {
            return StreamingFileSink
                .forBulkFormat(new Path(outputPath),
                    CompressionAvroParquetWriters.forReflectRecord(TickCount.class, CompressionCodecName.SNAPPY))
                .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/"))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(OutputFileConfig.builder()
                    .withPartPrefix("complete")
                    .withPartSuffix(".parquet")
                    .build())
                .build();
          }
        }
        
      • TickCount.java

        /*
         * ByteDance Volcengine EMR, Copyright 2022.
         *
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         *
         *     http://www.apache.org/licenses/LICENSE-2.0
         *
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
         */
        
        package org.apache.flink;
        
        public class TickCount {
          private String tick;
          private int count;
        
          public TickCount(String tick, int count) {
            this.tick = tick;
            this.count = count;
          }
        
          public String getTick() {
            return tick;
          }
        
          public void setTick(String tick) {
            this.tick = tick;
          }
        
          public int getCount() {
            return count;
          }
        
          public void setCount(int count) {
            this.count = count;
          }
        }
        
      • CompressionAvroParquetWriters.java

        说明

        Flink 默认提供的AvroParquetWriters不支持压缩,需要重新实现。

        /*
         * ByteDance Volcengine EMR, Copyright 2022.
         *
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         *
         *     http://www.apache.org/licenses/LICENSE-2.0
         *
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
         */
        
        package org.apache.flink;
        
        import org.apache.avro.Schema;
        import org.apache.avro.generic.GenericData;
        import org.apache.avro.generic.GenericRecord;
        import org.apache.avro.reflect.ReflectData;
        import org.apache.avro.specific.SpecificData;
        import org.apache.avro.specific.SpecificRecordBase;
        import org.apache.flink.formats.parquet.ParquetBuilder;
        import org.apache.flink.formats.parquet.ParquetWriterFactory;
        import org.apache.parquet.avro.AvroParquetWriter;
        import org.apache.parquet.hadoop.ParquetWriter;
        import org.apache.parquet.hadoop.metadata.CompressionCodecName;
        import org.apache.parquet.io.OutputFile;
        
        import java.io.IOException;
        
        public class CompressionAvroParquetWriters {
          public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(
              Class<T> type, CompressionCodecName compressionCodecName) {
            final String schemaString = SpecificData.get().getSchema(type).toString();
            final ParquetBuilder<T> builder =
                (out) -> createAvroParquetWriter(schemaString, SpecificData.get(), out, compressionCodecName);
            return new ParquetWriterFactory<>(builder);
          }
        
          /**
           * Creates a ParquetWriterFactory that accepts and writes Avro generic types. The Parquet
           * writers will use the given schema to build and write the columnar data.
           *
           * @param schema The schema of the generic type.
           */
          public static ParquetWriterFactory<GenericRecord> forGenericRecord(
              Schema schema,
              CompressionCodecName compressionCodecName) {
            final String schemaString = schema.toString();
            final ParquetBuilder<GenericRecord> builder =
                (out) -> createAvroParquetWriter(schemaString, GenericData.get(), out, compressionCodecName);
            return new ParquetWriterFactory<>(builder);
          }
        
          /**
           * Creates a ParquetWriterFactory for the given type. The Parquet writers will use Avro to
           * reflectively create a schema for the type and use that schema to write the columnar data.
           *
           * @param type The class of the type to write.
           */
          public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type, CompressionCodecName compressionCodecName) {
            final String schemaString = ReflectData.get().getSchema(type).toString();
            final ParquetBuilder<T> builder =
                (out) -> createAvroParquetWriter(schemaString, ReflectData.get(), out, compressionCodecName);
            return new ParquetWriterFactory<>(builder);
          }
        
          private static <T> ParquetWriter<T> createAvroParquetWriter(
              String schemaString, GenericData dataModel, OutputFile out,
              CompressionCodecName compressionCodecName) throws IOException {
        
            final Schema schema = new Schema.Parser().parse(schemaString);
        
            return AvroParquetWriter.<T>builder(out)
                .withSchema(schema)
                .withDataModel(dataModel)
                .withCompressionCodec(compressionCodecName)
                .build();
          }
        
          // ------------------------------------------------------------------------
        
          /**
           * Class is not meant to be instantiated.
           */
          private CompressionAvroParquetWriters() {
          }
        }
        
  2. 任务执行。

    1. 可通过以下脚本生成数据。

      # generate_dummy_data.py
      import datetime
      import json
      import random
      
      
      def get_data():
          return {
              'event_time': datetime.datetime.now().isoformat(),
              'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
              'price': round(random.random() * 100, 2)
          }
      
      if __name__ == '__main__':
          num = 1000
          with open("/root/kafka/dummy_data.json", "a") as f:
              for _ in range(0, num):
                  f.write(json.dumps(get_data()) + '\n')
      
      mkdir /root/kafka/
      python generate_dummy_data.py
      
      ./bin/kafka-topics.sh --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} --create
      
      ./bin/kafka-console-producer.sh --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} < /root/kafka/dummy_data.json
      
    2. 提交任务

      /usr/lib/emr/current/flink/bin/flink run-application \
      -t yarn-application /opt/flink-kafka-tos-demo-1.0-SNAPSHOT.jar \
      --tos.output.path tos://{bucket}/xxxx/flink/sdk/ticket_stat_snappy \
      --kafka.topic xxx_test \
      --kafka.bootstrap.servers {borker1_ip}:9092,{borker2_ip}:9092 \
      --checkpoint.path tos://{bucket}/xxxx/flink/ckp/ticket_snappy/