Apache Flink 是一个可分布式的开源计算框架,能够支持数据流处理和批量数据处理两种应用类型。本文介绍下在 Flink 中操作 Iceberg 表。
E-MapReduce(EMR)1.4.0版本之后的版本(包括1.4.0版本)支持在 Flink 中操作 Iceberg 表。
EMR 2.1.0 版本之后的版本(包含2.1.0版本)支持在 Flink 中操作 Iceberg 表。
已创建 EMR 集群,安装有 Iceberg 组件和 Flink 组件。有两种方式可以安装Iceberg和Flink组件:
本文介绍如何采用 Flink SQL 方式操作 Iceberg 表。 如果您希望采用 Flink DataStream API 来访问Apache Iceberg 表,则请参考 Iceberg官网 进行操作。
配置 Flink 的 checkpoint
因为目前 Flink 是在每次 checkpoint 的时候提交 Iceberg 的信息,所以需要配置 flink 的checkpoint。
在 集群管理 > 集群列表 > 具体集群名称,进入集群详情 界面.
导航栏中点击 服务列表,点击 Iceberg 服务并进入。
点击emr集群节点的ECS ID,跳转进入到云服务器的实例界面,点击右上角的 远程连接 按钮,输入集群创建时的root密码,进入远程终端。
在 /usr/lib/emr/current/flink/conf/flink-conf.yaml 文件的 checkpoint参数下,添加如下配置:
execution.checkpointing.interval: 10s # checkpoint间隔时间 execution.checkpointing.tolerable-failed-checkpoints: 10 # checkpoint 失败容忍次数
yaml文件配置完成后,使用以下命令,启动Flink SQL Client:
export HADOOP_CLASSPATH=`hadoop classpath` cd /usr/lib/emr/current/flink # 拷贝Iceberg需要的依赖 cp /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-*.jar lib cp /usr/lib/emr/current/flink/connectors/flink-sql-connector-hive-*.jar lib # 采用 flink on yarn模式,启动YARN Session ./bin/yarn-session.sh --detached # Start the flink SQL client ./bin/sql-client.sh embedded shell
不同 EMR 版本,Flink 和 Iceberg 的版本号可能不同。建议采用如下命令来定位您的 iceberg-flink-runtime-xx.xx.xx.jar 和 flink-sql-connector-hive-xx.xx.xx.jar 路径。
EMR3.x 和 EMR1.x 版本采用下面命令:
$ ls /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-*.jar
创建 Catalog
CREATE CATALOG <catalog_name> WITH ( 'type'='iceberg', '<config_key>'='<config_value>' );
目前 Iceberg 主要支持 HiveCatalog 和 HadoopCatalog。
HiveCatalog 将当前表的元数据文件路径存储在 Hive Metastore,所以每次读写 Iceberg 表都需要先从 Hive Metastore 中取出对应的表元数据文件路径。
HadoopCatalog 将当前表元数据文件路径记录在一个文件目录下,因此不需要连接 Hive Metastore。
不同 EMR 版本中节点的域名命名方式可能不同,所以下方示例中“emr-master-1”可参考 EMR 的域名规则做相应调整。
示例: 创建 HiveCatalog
CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://emr-master-1:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://emr-cluster/warehouse/tablespace/managed/hive' );
示例:创建 HadoopCatalog
CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://emr-cluster/warehouse/tablespace/managed/hive', 'property-version'='1' );
通过Flink SQL Client检查结果:
Flink SQL> SHOW CATALOGS; +-----------------+ | catalog name | +-----------------+ | default_catalog | | hadoop_catalog | | iceberg | +-----------------+ 3 rows in set
更多 SQL 命令,详情参考 Flink 官网文档。
USE CATALOG iceberg; CREATE DATABASE iceberg_db; USE iceberg_db;
CREATE TABLE iceberg.iceberg_db.iceberg_001 ( id BIGINT COMMENT 'unique id', data STRING ) WITH ( 'write.format.default'='ORC' );
--提交 flink批处理作业来获取iceberg表中的所有行 SET execution.runtime-mode = batch; SELECT * FROM iceberg.iceberg_db.iceberg_001 limit 10;
--从flink流作业中增量获取数据 SET execution.runtime-mode = streaming; SELECT * FROM iceberg.iceberg_db.iceberg_001 limit 10;
INSERT OVERWRITE只适合于batch job。通过下面命令设置batch job执行模式:
SET execution.runtime-mode = batch;
INSERT INTO iceberg.iceberg_db.iceberg_001 VALUES (1, 'a');
也可以使用 INSERT OVERWRITE 来使用查询结果替换表中的数据:
INSERT OVERWRITE iceberg.iceberg_db.iceberg_001 VALUES (1, 'a');
DROP TABLE iceberg.iceberg_db.iceberg_001;