目前在创建火山引擎 E-MapReduce(EMR)集群,并且安装Hudi服务后,EMR已经默认将Hudi相关依赖集成到Flink、Spark、Hive、Trino、Presto开源组件中。计算任务读写Hudi时,不需要额外再引入相关的Hudi依赖。不同的EMR版本使用了不同的Hudi版本,具体信息如下:
Hudi 版本 | EMR 版本 |
---|---|
Hudi 0.10.0 | EMR 1.3.1 |
Hudi 0.11.1 | EMR 3.0.1 ~ EMR 3.1.1 |
Hudi 0.12.2 | EMR 3.2.1 ~ EMR 3.8.1 |
Hudi 0.14.1 | EMR 3.9.1+ |
接下来将为您介绍 Hudi 的安装,并通过 Spark SQL 快速上手 Hudi 表和通过 Flink SQL 创建 Catalog/Table 相关内容。
Hudi 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Hudi 组件可见并且状态是正常的。详见创建集群。
如果在集群初始化时没有安装,也可以通过添加 Hudi 组件在已有集群上添加 Hudi。详见添加服务。
EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的 Hudi 表操作做一个说明,其他详细指南可以参考 Hudi高阶使用文档。
要快速上手 Hudi,可以启动一个 SparkSQL 的本地 session 快速读取 Hudi 表,降低使用门槛。
启动方式
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.2+ spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' # For Spark versions: 3.0 - 3.1 # Spark 3.1 spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# For Spark versions: 3.2+ spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' # For Spark versions: 3.0 - 3.1 spark-sql \ -conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
create table hudi_cow_nonpcf_tbl ( uuid int, name string, price double ) using hudi;
create table hudi_ctas_cow_pt_tbl using hudi tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts') partitioned by (dt) as select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
create table hudi_mor_tbl ( id int, name string, price double, ts bigint ) using hudi tblproperties ( type = 'mor', primaryKey = 'id', preCombineField = 'ts' );
COW 表和 MOR 表拥有相同的 SparkSQL 语法
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20; insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
update hudi_ctas_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
select * from hudi_ctas_cow_pt_tbl;
drop table hudi_cow_nonpcf_tbl;
Hudi 0.12 版本已经支持通过 Flink Catalog 来管理表信息,目前支持以下两种 Catalog:DFS Catalog 和 Hive Catalog
CREATE CATALOG dfs_catalog WITH ( 'type'='hudi', 'catalog.path'='hdfs://emr-master-1:8020/warehouse/tablespace/managed/hive' ); USE CATALOG dfs_catalog; CREATE DATABASE hudi_dfs_db; USE hudi_dfs_db; CREATE TABLE `dfs_catalog`.`hudi_dfs_db`.`flink_hudi_mor_tbl`( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'precombine.field' = 'ts' );
CREATE CATALOG hms_catalog WITH ( 'type'='hudi', 'catalog.path'='hdfs://emr-master-1:8020/warehouse/tablespace/managed/hive', 'hive.conf.dir'='/etc/emr/hive/conf/', 'mode'='hms' ); USE CATALOG hms_catalog; CREATE DATABASE hudi_hms_db; -- MOR 表 -- hive_sync.enabled默认等于true,在写数据成功后会额外将rt和ro同步到HMS中 CREATE TABLE `hms_catalog`.`hudi_hms_db`.`flink_hudi_mor_tbl`( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'precombine.field' = 'ts', 'hive_sync.enabled' = 'true' );