开源版本 Flink 不支持以 EXACTLY_ONCE 语义流式写入对象存储服务(TOS)存储,当有类似需求时,需要结合 Proton SDK 进行数据写入。从火山引擎 E-MapReduce(EMR)3.2.1 版本开始,火山 EMR Flink 已经默认在运行环境中集成了 Proton SDK,您可以开箱使用 Flink 读写 TOS 的功能,针对已有的历史集群,需要下载 Proton SDK,并且做一些额外配置,才能正常使用,可参考 Proton 发行版本,手动下载 Proton SDK。
火山EMR集群自 3.2.1 版本之后已经默认集成了 Proton 的相关依赖,包括 Hadoop 数据湖类型集群和 Flink 实时计算类型集群,无需额外配置。
针对 3.2.1 版本之前的存量集群,如果想要添加或者升级 Flink Proton 依赖包,在下载 Proton SDK 后, 将 Proton SDK 拷贝到每个 EMR 节点, 解压之后:
plugins/flink/proton-flink{flink.version}-bundle-{proton.version}.jar
, 比如plugins/flink/proton-flink1.16-bundle-2.0.1.jar
,放到 flink lib 目录/usr/lib/emr/current/flink/lib/
下,如果存在老版本的jar,请将其移除。proton-hadoop${hadoop.major.version}-bundle-{proton.version}.jar
,替换/usr/lib/emr/current/hadoop/share/hadoop/hdfs/
下的proton-hadoop${hadoop.major.version}-bundle-{old.proton.version}.jar
,拷贝时,请使用对应 Hadoop 系列的Jar包,比如 Hadoop2x 环境,请选择proton-hadoop2-bundle-{proton.version}.jar
火山 EMR 已经默认配置好 TOS 和 IAM 认证信息,无需额外配置,如果需要自定义配置信息,可通过core-site.xml
或者flink-conf.yaml
进行配置。flink-conf.yaml
配置可参考以下配置信息:
fs.tos.access-key-id: xxxx fs.tos.secret-access-key: xxx # 每个region设置对应的endpoint地址,如果运行在火山ECS,可以使用内网地址http://tos-cn-xxx.ivolces.com fs.tos.endpoint: http://tos-cn-guangzhou.volces.com # 也可以针对每一个bucket进行认证信息设置 fs.tos.bucket.{bucketname}.access-key-id:xxx fs.tos.bucket.{bucketname}.secret-access-key:xxx
如果需要用过 Filesystem connector 读取存储在 TOS 上的 Parquet 类型的数据时,由于当前 Flink ParquetVectorizedInputFormat
获取配置信息的限制,需要将 TOS 以及 IAM 认证信息添加到core-site.xml
中,才能正常读取。core-site.xml
详细配置可参考 Hadoop 使用 Proton - 配置修改 章节。
<configuration> <property> <name>fs.tos.access-key-id</name> <value>{iam.role.access.key}</value> </property> <property> <name>fs.tos.secret-access-key</name> <value>{iam.role.access.val}</value> </property> <property> <name>fs.tos.impl</name> <value>io.proton.fs.RawFileSystem</value> </property> <property> <name>fs.tos.endpoint</name> <value>http://tos-cn-guangzhou.volces.com</value> </property> </configuration>
在自建 Hadoop + Flink 集群中,如果要实现 Flink 以 EXACTLY_ONCE 语义流式写入 TOS 存储,需要在下载 Proton SDK 之后,将proton-flink{flink.version}-bundle-{proton.version}.jar
拷贝到 flink lib 中。
参考上述火山EMR - 认证配置章节进行自定义配置。
独立 Flink 集群和自建 Hadoop+Flink 集群类似,需要在下载 Proton SDK 之后,将proton-flink${flink.version}-bundle-${proton.version}.jar
拷贝到 flink lib 目录下,然后在core-site.xml
或者flink-conf.yaml
中添加 IAM 和 TOS 的配置信息。
参考上述火山EMR - 认证配置章节进行自定义配置。
proton-flink
支持开启熵注入(Entropy Injection)功能,即使用一段固定长度的随机字符串,替换写入路径中的一段特定字符串,从而解决对象存储单分片QPS限额的问题,通过随机化部分路径,将对象分散到多个分片中,提升写入效率。
比如当写入的对象存储为TOS时,需在flink-conf.yaml
中添加一下参数:
tos.entropy.key: <user-defined-key> tos.entropy.length: <user-defined-length>
说明
目前 Flink 运行时仅对 checkpoint 数据文件使用熵注入选项。所有其他文件包括 checkpoint 元数据与外部 URI 都不使用熵注入,以保证 checkpoint URI 的可预测性。
如果使用场景中使用了Proton的缓存加速模式,同时期望对Flink读写进行加速,可在flink-conf.yaml
或core-site.xml
添加以下参数,开启加速模式。
proton.cache.enable: true proton.metaserver.addresses: <proton-metaserver-host:port>
说明
开启加速模式时,应结合实际使用场景,确认是否有必要使用缓存加速模式,比如应考虑数据的可重复利用性,元数据实时一致性等。如果Flink的使用场景仅仅是从外部数据源接入数据,然后保存到对象存储上,并且接入的数据,短时间内不会被下游系统消费,那么不建议Flink数据同步任务使用缓存加速模式。
默认情况下,对于流写任务,Flink仅支持通过rename的方式来实现写入操作,大部分对象存储并不支持rename操作,所以一般情况下,通过hive connector写入对象存储时,没法支持exactly-once语义。但是可以通过在flink-conf.yaml
中修改以下参数,使用Flink原生的Parquet或者Orc writer,来支持exactly-once语义。(仅针对parquet和orc文件类型生效)
table.exec.hive.fallback-mapred-writer: false
启动 SQL Client 客户端。
说明
使用 SQL Client 的时候访问 TOS 时,需要显示设置HADOOP_CLASSPATH
。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` /usr/lib/emr/current/flink/bin/sql-client.sh embedded set execution.target=yarn-per-job;
创建数据源。
create table datagen ( id int, first_name varchar, last_name varchar, phone bigint, address varchar, company int ) with ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '100000', 'fields.first_name.length' = '5', 'fields.last_name.length' = '5', 'fields.phone.min' = '0000000', 'fields.phone.min' = '9999999', 'fields.address.length' = '7', 'fields.company.min' = '10', 'fields.company.max' = '20' );
写入数据。
CREATE TABLE tos_parquet_user_sink_tbl ( id int, first_name varchar, last_name varchar, phone bigint, address varchar, company int ) PARTITIONED BY (company) WITH ( 'connector' = 'filesystem', 'path' = 'tos://{bucket}/xxxxx/flink/tos_parquet_user_tbl', 'format' = 'parquet', 'sink.rolling-policy.file-size' = '5MB', 'sink.rolling-policy.rollover-interval' = '5min', 'sink.rolling-policy.check-interval' = '1min', 'parquet.compression'='SNAPPY' -- 如果无需压缩,可以不添加该配置 ); SET 'execution.checkpointing.interval' = '300s'; INSERT INTO tos_parquet_user_sink_tbl SELECT * FROM datagen;
代码样例。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.flink</groupId> <artifactId>flink-kafka-tos-demo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>flink-kafka-tos-demo</name> <properties> <flink.version>1.16.1</flink.version> <hadoop.version>3.3.4</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.12.2</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> <exclusion> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.apache.flink.KafkaToTosDemo</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
业务逻辑。
KafkaToTosDemo.java
/* * ByteDance Volcengine EMR, Copyright 2022. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.avro.AvroParquetWriters; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.parquet.hadoop.metadata.CompressionCodecName; public class KafkaToTosDemo { public static void main(String[] args) throws Exception { ParameterTool pt = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String outputPath = pt.get("tos.output.path"); String topic = pt.getRequired("kafka.topic"); String consumerGroup = pt.get("kafka.consumer.group.id", "kafka-to-tos-demo-group"); String bootstrapServers = pt.getRequired("kafka.bootstrap.servers"); String checkpointPath = pt.getRequired("checkpoint.path"); long checkpointInterval = pt.getLong("checkpoint.interval", 10_000L); env.getCheckpointConfig().setCheckpointStorage(checkpointPath); env.getCheckpointConfig().setCheckpointInterval(checkpointInterval); ObjectMapper jsonParser = new ObjectMapper(); env.fromSource(createKafkaSource(topic, bootstrapServers, consumerGroup), WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source") .map(data -> { JsonNode jsonNode = jsonParser.readValue(data, JsonNode.class); return new Tuple2<>(jsonNode.get("ticker").toString(), 1); }).returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(v -> v.f0) // .timeWindow(Time.minutes(1)) // Tumbling window definition // Flink 1.11 .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // since Flink 1.13 .sum(1) // Count the appearances by ticker per partition .map(t -> new TickCount(t.f0, t.f1)) .addSink(createTosSnappySinkFromStaticConfig(outputPath)) .name("TOS Parquet Sink"); env.execute("kafka-to-tos-demo"); } private static KafkaSource<String> createKafkaSource(String topic, String bootstrapServers, String consumerGroup) { return KafkaSource.<String>builder() .setBootstrapServers(bootstrapServers) .setTopics(topic) .setGroupId(consumerGroup) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); } private static StreamingFileSink<TickCount> createTosSinkFromStaticConfig(String outputPath) { return StreamingFileSink .forBulkFormat(new Path(outputPath), AvroParquetWriters.forReflectRecord(TickCount.class)) .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(OutputFileConfig.builder() .withPartPrefix("complete") .withPartSuffix(".parquet") .build()) .build(); } private static StreamingFileSink<TickCount> createTosSnappySinkFromStaticConfig(String outputPath) { return StreamingFileSink .forBulkFormat(new Path(outputPath), CompressionAvroParquetWriters.forReflectRecord(TickCount.class, CompressionCodecName.SNAPPY)) .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(OutputFileConfig.builder() .withPartPrefix("complete") .withPartSuffix(".parquet") .build()) .build(); } }
TickCount.java
/* * ByteDance Volcengine EMR, Copyright 2022. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink; public class TickCount { private String tick; private int count; public TickCount(String tick, int count) { this.tick = tick; this.count = count; } public String getTick() { return tick; } public void setTick(String tick) { this.tick = tick; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } }
CompressionAvroParquetWriters.java
说明
Flink 默认提供的AvroParquetWriters
不支持压缩,需要重新实现。
/* * ByteDance Volcengine EMR, Copyright 2022. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificRecordBase; import org.apache.flink.formats.parquet.ParquetBuilder; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.OutputFile; import java.io.IOException; public class CompressionAvroParquetWriters { public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord( Class<T> type, CompressionCodecName compressionCodecName) { final String schemaString = SpecificData.get().getSchema(type).toString(); final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, SpecificData.get(), out, compressionCodecName); return new ParquetWriterFactory<>(builder); } /** * Creates a ParquetWriterFactory that accepts and writes Avro generic types. The Parquet * writers will use the given schema to build and write the columnar data. * * @param schema The schema of the generic type. */ public static ParquetWriterFactory<GenericRecord> forGenericRecord( Schema schema, CompressionCodecName compressionCodecName) { final String schemaString = schema.toString(); final ParquetBuilder<GenericRecord> builder = (out) -> createAvroParquetWriter(schemaString, GenericData.get(), out, compressionCodecName); return new ParquetWriterFactory<>(builder); } /** * Creates a ParquetWriterFactory for the given type. The Parquet writers will use Avro to * reflectively create a schema for the type and use that schema to write the columnar data. * * @param type The class of the type to write. */ public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type, CompressionCodecName compressionCodecName) { final String schemaString = ReflectData.get().getSchema(type).toString(); final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, ReflectData.get(), out, compressionCodecName); return new ParquetWriterFactory<>(builder); } private static <T> ParquetWriter<T> createAvroParquetWriter( String schemaString, GenericData dataModel, OutputFile out, CompressionCodecName compressionCodecName) throws IOException { final Schema schema = new Schema.Parser().parse(schemaString); return AvroParquetWriter.<T>builder(out) .withSchema(schema) .withDataModel(dataModel) .withCompressionCodec(compressionCodecName) .build(); } // ------------------------------------------------------------------------ /** * Class is not meant to be instantiated. */ private CompressionAvroParquetWriters() { } }
任务执行。
可通过以下脚本生成数据。
# generate_dummy_data.py import datetime import json import random def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2) } if __name__ == '__main__': num = 1000 with open("/root/kafka/dummy_data.json", "a") as f: for _ in range(0, num): f.write(json.dumps(get_data()) + '\n')
mkdir /root/kafka/ python generate_dummy_data.py ./bin/kafka-topics.sh --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} --create ./bin/kafka-console-producer.sh --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} < /root/kafka/dummy_data.json
提交任务
/usr/lib/emr/current/flink/bin/flink run-application \ -t yarn-application /opt/flink-kafka-tos-demo-1.0-SNAPSHOT.jar \ --tos.output.path tos://{bucket}/xxxx/flink/sdk/ticket_stat_snappy \ --kafka.topic xxx_test \ --kafka.bootstrap.servers {borker1_ip}:9092,{borker2_ip}:9092 \ --checkpoint.path tos://{bucket}/xxxx/flink/ckp/ticket_snappy/