Apache Flink 是一个可分布式的开源计算框架,能够支持数据流处理和批量数据处理两种应用类型。本文介绍下在 Flink 中操作 Iceberg 表。
E-MapReduce(EMR)1.4.0版本之后的版本(包括1.4.0版本)支持在 Flink 中操作 Iceberg 表。
EMR 2.1.0 版本之后的版本(包含2.1.0版本)支持在 Flink 中操作 Iceberg 表。
已创建 EMR 集群,安装有 Iceberg 组件和 Flink 组件。有两种方式可以安装Iceberg和Flink组件:
本文介绍如何采用 Flink SQL 方式操作 Iceberg 表。 如果您希望采用 Flink DataStream API 来访问Apache Iceberg 表,则请参考 Iceberg官网 进行操作。
配置 Flink 的 checkpoint
因为目前 Flink 是在每次 checkpoint 的时候提交 Iceberg 的信息,所以需要配置 flink 的checkpoint。
在 集群管理 > 集群列表 > 具体集群名称,进入集群详情 界面.
导航栏中点击 服务列表,点击 Iceberg 服务并进入。
点击emr集群节点的ECS ID,跳转进入到云服务器的实例界面,点击右上角的 远程连接 按钮,输入集群创建时的root密码,进入远程终端。
在 /usr/lib/emr/current/flink/conf/flink-conf.yaml 文件的 checkpoint参数下,添加如下配置:
execution.checkpointing.interval: 10s # checkpoint间隔时间 execution.checkpointing.tolerable-failed-checkpoints: 10 # checkpoint 失败容忍次数
yaml文件配置完成后,使用以下命令,启动Flink SQL Client:
export HADOOP_CLASSPATH=`hadoop classpath` cd /usr/lib/emr/current/flink # 拷贝Iceberg需要的依赖 cp /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-*.jar lib cp /usr/lib/emr/current/flink/connectors/flink-sql-connector-hive-*.jar lib # 采用 flink on yarn模式,启动YARN Session ./bin/yarn-session.sh --detached # Start the flink SQL client ./bin/sql-client.sh embedded shell
说明
不同 EMR 版本,Flink 和 Iceberg 的版本号可能不同。建议采用如下命令来定位您的 iceberg-flink-runtime-xx.xx.xx.jar 和 flink-sql-connector-hive-xx.xx.xx.jar 路径。
EMR3.x 和 EMR1.x 版本采用下面命令:
$ ls /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-*.jar
/usr/lib/emr/current/flink/connectors/flink-sql-connector-hive-3*.jar
EMR3.x 和 EMR1.x 版本采用下面命令:
$ ls /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-*.jar
/usr/lib/emr/current/flink/connectors/flink-sql-connector-hive-2.3.6*.jar
创建 Catalog
CREATE CATALOG <catalog_name> WITH ( 'type'='iceberg', '<config_key>'='<config_value>' );
目前 Iceberg 主要支持 HiveCatalog 和 HadoopCatalog。
HiveCatalog 将当前表的元数据文件路径存储在 Hive Metastore,所以每次读写 Iceberg 表都需要先从 Hive Metastore 中取出对应的表元数据文件路径。
HadoopCatalog 将当前表元数据文件路径记录在一个文件目录下,因此不需要连接 Hive Metastore。
说明
不同 EMR 版本中节点的域名命名方式可能不同,所以下方示例中“emr-master-1”可参考 EMR 的域名规则做相应调整。
示例: 创建 HiveCatalog
CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://emr-master-1:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://emr-cluster/warehouse/tablespace/managed/hive' );
示例:创建 HadoopCatalog
CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://emr-cluster/warehouse/tablespace/managed/hive', 'property-version'='1' );
通过Flink SQL Client检查结果:
Flink SQL> SHOW CATALOGS; +-----------------+ | catalog name | +-----------------+ | default_catalog | | hadoop_catalog | | iceberg | +-----------------+ 3 rows in set
更多 SQL 命令,详情参考 Flink 官网文档。
创建数据库
USE CATALOG iceberg; CREATE DATABASE iceberg_db; USE iceberg_db;
创建表
CREATE TABLE iceberg.iceberg_db.iceberg_001 ( id BIGINT COMMENT 'unique id', data STRING ) WITH ( 'write.format.default'='ORC' );
查询
可以执行下面的命令把执行类型设置为流式处理模式或者批处理模式。
--提交 flink批处理作业来获取iceberg表中的所有行 SET execution.runtime-mode = batch; SELECT * FROM iceberg.iceberg_db.iceberg_001 limit 10;
--从flink流作业中增量获取数据 SET execution.runtime-mode = streaming; SELECT * FROM iceberg.iceberg_db.iceberg_001 limit 10;
写入
说明
INSERT OVERWRITE只适合于batch job。通过下面命令设置batch job执行模式:
SET execution.runtime-mode = batch;
INSERT INTO iceberg.iceberg_db.iceberg_001 VALUES (1, 'a');
也可以使用 INSERT OVERWRITE 来使用查询结果替换表中的数据:
INSERT OVERWRITE iceberg.iceberg_db.iceberg_001 VALUES (1, 'a');
删除表
DROP TABLE iceberg.iceberg_db.iceberg_001;
说明
删除表,也会将表的数据进行删除。但会留存表的空目录信息,如: