You need to enable JavaScript to run this app.
导航
基础使用
最近更新时间:2024.04.10 14:37:08首次发布时间:2023.03.30 14:04:29

目前在创建火山引擎 E-MapReduce(EMR)集群,并且安装Hudi服务后,EMR已经默认将Hudi相关依赖集成到Flink、Spark、Hive、Trino、Presto开源组件中。计算任务读写Hudi时,不需要额外再引入相关的Hudi依赖。不同的EMR版本使用了不同的Hudi版本,具体信息如下:

Hudi 版本EMR 版本
Hudi 0.10.0EMR 1.3.1

Hudi 0.11.1

EMR 3.0.1 ~ EMR 3.1.1
EMR 2.0.1 ~ EMR 2.2.0

Hudi 0.12.2

EMR 3.2.1 ~ EMR 3.8.1
EMR 2.3.1 ~ EMR 2.4.0

Hudi 0.14.1EMR 3.9.1+

接下来将为您介绍 Hudi 的安装,并通过 Spark SQL 快速上手 Hudi 表和通过 Flink SQL 创建 Catalog/Table 相关内容。

1 Hudi 安装

Hudi 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Hudi 组件可见并且状态是正常的。详见创建集群
如果在集群初始化时没有安装,也可以通过添加 Hudi 组件在已有集群上添加 Hudi。详见添加服务

2 通过 Spark SQL 快速上手 Hudi 表

EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的 Hudi 表操作做一个说明,其他详细指南可以参考 Hudi高阶使用文档
要快速上手 Hudi,可以启动一个 SparkSQL 的本地 session 快速读取 Hudi 表,降低使用门槛。

启动方式

  • Spark2 Hudi 0.14以下版本
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
  • Spark2 Hudi 0.14+版本
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
  • Spark3 Hudi 0.14以下版本
# For Spark versions: 3.2+
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

# For Spark versions: 3.0 - 3.1
# Spark 3.1
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
  • Spark3 Hudi 0.14+版本
# For Spark versions: 3.2+
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

# For Spark versions: 3.0 - 3.1
spark-sql \
-conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

2.1 创建 Hudi

2.1.1 创建非分区 COW

create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi;

2.1.2 创建分区 COW 表

create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;

2.1.3 创建 Hudi MOR 表

create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

2.2 插入数据

COW 表和 MOR 表拥有相同的 SparkSQL 语法

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

2.3 更新数据

update hudi_ctas_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;

2.4 查询表数据

select * from hudi_ctas_cow_pt_tbl;

2.5 删除 Hudi

drop table hudi_cow_nonpcf_tbl;

Hudi 0.12 版本已经支持通过 Flink Catalog 来管理表信息,目前支持以下两种 Catalog:DFS Catalog 和 Hive Catalog

3.1 DFS Catalog

CREATE CATALOG dfs_catalog WITH (
    'type'='hudi',
    'catalog.path'='hdfs://emr-master-1:8020/warehouse/tablespace/managed/hive'
);

USE CATALOG dfs_catalog;

CREATE DATABASE hudi_dfs_db;

USE hudi_dfs_db;

CREATE TABLE `dfs_catalog`.`hudi_dfs_db`.`flink_hudi_mor_tbl`(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field' = 'uuid',
  'precombine.field' = 'ts'
);

3.2 Hive Catalog

CREATE CATALOG hms_catalog WITH (
    'type'='hudi',
    'catalog.path'='hdfs://emr-master-1:8020/warehouse/tablespace/managed/hive',
    'hive.conf.dir'='/etc/emr/hive/conf/',
    'mode'='hms'
);

USE CATALOG hms_catalog;
CREATE DATABASE hudi_hms_db;

-- MOR 表
-- hive_sync.enabled默认等于true,在写数据成功后会额外将rt和ro同步到HMS中
CREATE TABLE `hms_catalog`.`hudi_hms_db`.`flink_hudi_mor_tbl`(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field' = 'uuid',
  'precombine.field' = 'ts',
  'hive_sync.enabled' = 'true'
);