Flink 使用 Proton
最近更新时间:2024.08.02 15:57:02首次发布时间:2023.05.17 15:03:57

开源版本 Flink 不支持以 EXACTLY_ONCE 语义流式写入对象存储服务(TOS)存储,当有类似需求时,需要结合 Proton SDK 进行数据写入。从火山引擎 E-MapReduce(EMR)3.2.1 版本开始,火山 EMR Flink 已经默认在运行环境中集成了 Proton SDK,您可以开箱使用 Flink 读写 TOS 的功能,针对已有的历史集群,需要下载 Proton SDK,并且做一些额外配置,才能正常使用,可参考 Proton 发行版本,手动下载 Proton SDK。

1 火山 EMR

1.1 集群配置

火山EMR集群自 3.2.1 版本之后已经默认集成了 Proton 的相关依赖,包括 Hadoop 数据湖类型集群和 Flink 实时计算类型集群,无需额外配置。
针对 3.2.1 版本之前的存量集群,如果想要添加或者升级 Flink Proton 依赖包,在下载 Proton SDK 后, 将 Proton SDK 拷贝到每个 EMR 节点, 解压之后:

  • plugins/flink/proton-flink{flink.version}-bundle-{proton.version}.jar, 比如plugins/flink/proton-flink1.16-bundle-2.0.1.jar,放到 flink lib 目录/usr/lib/emr/current/flink/lib/下,如果存在老版本的jar,请将其移除。
  • proton-hadoop${hadoop.major.version}-bundle-{proton.version}.jar,替换/usr/lib/emr/current/hadoop/share/hadoop/hdfs/下的proton-hadoop${hadoop.major.version}-bundle-{old.proton.version}.jar,拷贝时,请使用对应 Hadoop 系列的Jar包,比如 Hadoop2x 环境,请选择proton-hadoop2-bundle-{proton.version}.jar

1.2 认证配置

火山 EMR 已经默认配置好 TOS 和 IAM 认证信息,无需额外配置,如果需要自定义配置信息,可通过core-site.xml或者flink-conf.yaml进行配置。flink-conf.yaml配置可参考以下配置信息:

fs.tos.access-key-id: xxxx
fs.tos.secret-access-key: xxx
# 每个region设置对应的endpoint地址,如果运行在火山ECS,可以使用内网地址

# 也可以针对每一个bucket进行认证信息设置

如果需要用过 Filesystem connector 读取存储在 TOS 上的 Parquet 类型的数据时,由于当前 Flink ParquetVectorizedInputFormat获取配置信息的限制,需要将 TOS 以及 IAM 认证信息添加到core-site.xml中,才能正常读取。core-site.xml详细配置可参考 Hadoop 使用 Proton - 配置修改 章节。


2.1 集群配置

在自建 Hadoop + Flink 集群中,如果要实现 Flink 以 EXACTLY_ONCE 语义流式写入 TOS 存储,需要在下载 Proton SDK 之后,将proton-flink{flink.version}-bundle-{proton.version}.jar拷贝到 flink lib 中。

2.2 认证配置

参考上述火山EMR - 认证配置章节进行自定义配置。

3.1 集群配置

独立 Flink 集群和自建 Hadoop+Flink 集群类似,需要在下载 Proton SDK 之后,将proton-flink${flink.version}-bundle-${proton.version}.jar拷贝到 flink lib 目录下,然后在core-site.xml或者flink-conf.yaml中添加 IAM 和 TOS 的配置信息。

3.2 认证配置

参考上述火山EMR - 认证配置章节进行自定义配置。

4 自定义配置

4.1 熵注入(Entropy Injection)

proton-flink支持开启熵注入(Entropy Injection)功能,即使用一段固定长度的随机字符串,替换写入路径中的一段特定字符串,从而解决对象存储单分片QPS限额的问题,通过随机化部分路径,将对象分散到多个分片中,提升写入效率。

tos.entropy.key: <user-defined-key>
tos.entropy.length: <user-defined-length>


目前 Flink 运行时仅对 checkpoint 数据文件使用熵注入选项。所有其他文件包括 checkpoint 元数据与外部 URI 都不使用熵注入,以保证 checkpoint URI 的可预测性。

4.2 启用Proton缓存加速模式


proton.cache.enable: true
proton.metaserver.addresses: <proton-metaserver-host:port>



4.3 Hive Connector exactly-once语义

默认情况下,对于流写任务,Flink仅支持通过rename的方式来实现写入操作,大部分对象存储并不支持rename操作,所以一般情况下,通过hive connector写入对象存储时,没法支持exactly-once语义。但是可以通过在flink-conf.yaml中修改以下参数,使用Flink原生的Parquet或者Orc writer,来支持exactly-once语义。(仅针对parquet和orc文件类型生效)

table.exec.hive.fallback-mapred-writer: false

5 使用样例

  1. 启动 SQL Client 客户端。


    使用 SQL Client 的时候访问 TOS 时,需要显示设置HADOOP_CLASSPATH

    export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
    /usr/lib/emr/current/flink/bin/ embedded
  2. 创建数据源。

    create table datagen (
        id          int,
        first_name  varchar,
        last_name   varchar,
        phone       bigint,
        address     varchar,
        company     int
    ) with (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        '' = 'sequence',
        '' = '1',
        '' = '100000',
        'fields.first_name.length' = '5',
        'fields.last_name.length' = '5',
        '' = '0000000',
        '' = '9999999',
        'fields.address.length' = '7',
        '' = '10',
        '' = '20'
  3. 写入数据。

    CREATE TABLE tos_parquet_user_sink_tbl (
        id          int,
        first_name  varchar,
        last_name   varchar,
        phone       bigint,
        address     varchar,
        company     int
    ) PARTITIONED BY (company) WITH (
     'connector' = 'filesystem',
     'path' = 'tos://{bucket}/xxxxx/flink/tos_parquet_user_tbl',
     'format' = 'parquet',
     'sink.rolling-policy.file-size' = '5MB',
     'sink.rolling-policy.rollover-interval' = '5min',
     'sink.rolling-policy.check-interval' = '1min',
     'parquet.compression'='SNAPPY' -- 如果无需压缩,可以不添加该配置
    SET 'execution.checkpointing.interval' = '300s';
    INSERT INTO tos_parquet_user_sink_tbl SELECT * FROM datagen;

  1. 代码样例。

    1. pom.xml

      <project xmlns="" xmlns:xsi=""
                          <!-- Run shade goal on package phase -->
    2. 业务逻辑。


         ByteDance Volcengine EMR, Copyright 2022.
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
        package org.apache.flink;
        import org.apache.flink.api.common.eventtime.WatermarkStrategy;
        import org.apache.flink.api.common.serialization.SimpleStringSchema;
        import org.apache.flink.api.common.typeinfo.Types;
        import org.apache.flink.connector.kafka.source.KafkaSource;
        import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
        import org.apache.flink.core.fs.Path;
        import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
        import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
        import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
        import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
        import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
        import org.apache.flink.streaming.api.windowing.time.Time;
        import org.apache.parquet.hadoop.metadata.CompressionCodecName;
        public class KafkaToTosDemo {
          public static void main(String[] args) throws Exception {
            ParameterTool pt = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            String outputPath = pt.get("tos.output.path");
            String topic = pt.getRequired("kafka.topic");
            String consumerGroup = pt.get("", "kafka-to-tos-demo-group");
            String bootstrapServers = pt.getRequired("kafka.bootstrap.servers");
            String checkpointPath = pt.getRequired("checkpoint.path");
            long checkpointInterval = pt.getLong("checkpoint.interval", 10_000L);
            ObjectMapper jsonParser = new ObjectMapper();
            env.fromSource(createKafkaSource(topic, bootstrapServers, consumerGroup),
                    WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
                .map(data -> {
                  JsonNode jsonNode = jsonParser.readValue(data, JsonNode.class);
                  return new Tuple2<>(jsonNode.get("ticker").toString(), 1);
                }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(v -> v.f0)
                // .timeWindow(Time.minutes(1)) // Tumbling window definition // Flink 1.11
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // since Flink 1.13
                .sum(1) // Count the appearances by ticker per partition
                .map(t -> new TickCount(t.f0, t.f1))
                .name("TOS Parquet Sink");
          private static KafkaSource<String> createKafkaSource(String topic, String bootstrapServers, String consumerGroup) {
            return KafkaSource.<String>builder()
                .setValueOnlyDeserializer(new SimpleStringSchema())
          private static StreamingFileSink<TickCount> createTosSinkFromStaticConfig(String outputPath) {
            return StreamingFileSink
                .forBulkFormat(new Path(outputPath), AvroParquetWriters.forReflectRecord(TickCount.class))
                .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/"))
          private static StreamingFileSink<TickCount> createTosSnappySinkFromStaticConfig(String outputPath) {
            return StreamingFileSink
                .forBulkFormat(new Path(outputPath),
                    CompressionAvroParquetWriters.forReflectRecord(TickCount.class, CompressionCodecName.SNAPPY))
                .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/"))

         ByteDance Volcengine EMR, Copyright 2022.
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
        package org.apache.flink;
        public class TickCount {
          private String tick;
          private int count;
          public TickCount(String tick, int count) {
            this.tick = tick;
            this.count = count;
          public String getTick() {
            return tick;
          public void setTick(String tick) {
            this.tick = tick;
          public int getCount() {
            return count;
          public void setCount(int count) {
            this.count = count;


        Flink 默认提供的AvroParquetWriters不支持压缩,需要重新实现。

         ByteDance Volcengine EMR, Copyright 2022.
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
        package org.apache.flink;
        import org.apache.avro.Schema;
        import org.apache.avro.generic.GenericData;
        import org.apache.avro.generic.GenericRecord;
        import org.apache.avro.reflect.ReflectData;
        import org.apache.avro.specific.SpecificData;
        import org.apache.avro.specific.SpecificRecordBase;
        import org.apache.flink.formats.parquet.ParquetBuilder;
        import org.apache.flink.formats.parquet.ParquetWriterFactory;
        import org.apache.parquet.avro.AvroParquetWriter;
        import org.apache.parquet.hadoop.ParquetWriter;
        import org.apache.parquet.hadoop.metadata.CompressionCodecName;
        public class CompressionAvroParquetWriters {
          public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(
              Class<T> type, CompressionCodecName compressionCodecName) {
            final String schemaString = SpecificData.get().getSchema(type).toString();
            final ParquetBuilder<T> builder =
                (out) -> createAvroParquetWriter(schemaString, SpecificData.get(), out, compressionCodecName);
            return new ParquetWriterFactory<>(builder);
           * Creates a ParquetWriterFactory that accepts and writes Avro generic types. The Parquet
           * writers will use the given schema to build and write the columnar data.
           * @param schema The schema of the generic type.
          public static ParquetWriterFactory<GenericRecord> forGenericRecord(
              Schema schema,
              CompressionCodecName compressionCodecName) {
            final String schemaString = schema.toString();
            final ParquetBuilder<GenericRecord> builder =
                (out) -> createAvroParquetWriter(schemaString, GenericData.get(), out, compressionCodecName);
            return new ParquetWriterFactory<>(builder);
           * Creates a ParquetWriterFactory for the given type. The Parquet writers will use Avro to
           * reflectively create a schema for the type and use that schema to write the columnar data.
           * @param type The class of the type to write.
          public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type, CompressionCodecName compressionCodecName) {
            final String schemaString = ReflectData.get().getSchema(type).toString();
            final ParquetBuilder<T> builder =
                (out) -> createAvroParquetWriter(schemaString, ReflectData.get(), out, compressionCodecName);
            return new ParquetWriterFactory<>(builder);
          private static <T> ParquetWriter<T> createAvroParquetWriter(
              String schemaString, GenericData dataModel, OutputFile out,
              CompressionCodecName compressionCodecName) throws IOException {
            final Schema schema = new Schema.Parser().parse(schemaString);
            return AvroParquetWriter.<T>builder(out)
          // ------------------------------------------------------------------------
           * Class is not meant to be instantiated.
          private CompressionAvroParquetWriters() {
  2. 任务执行。

    1. 可通过以下脚本生成数据。

      import datetime
      import json
      import random
      def get_data():
          return {
              'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
              'price': round(random.random() * 100, 2)
      if __name__ == '__main__':
          num = 1000
          with open("/root/kafka/dummy_data.json", "a") as f:
              for _ in range(0, num):
                  f.write(json.dumps(get_data()) + '\n')
      mkdir /root/kafka/
      ./bin/ --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} --create
      ./bin/ --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} < /root/kafka/dummy_data.json
    2. 提交任务

      /usr/lib/emr/current/flink/bin/flink run-application \
      -t yarn-application /opt/flink-kafka-tos-demo-1.0-SNAPSHOT.jar \
      --tos.output.path tos://{bucket}/xxxx/flink/sdk/ticket_stat_snappy \
      --kafka.topic xxx_test \
      --kafka.bootstrap.servers {borker1_ip}:9092,{borker2_ip}:9092 \
      --checkpoint.path tos://{bucket}/xxxx/flink/ckp/ticket_snappy/