火山引擎 E-MapReduce(EMR)支持通过 Spark、Flink 、 Hive 、Presto和Trino 等引擎对 Hudi 表进行读写操作。创建EMR集群,并安装Hudi服务后,EMR已经默认将Hudi相关依赖集成到Flink、Spark、Hive、Trino、Presto开源组件中,无需额外配置。
Hudi 整体支持情况如下:
查询操作 | 写入操作 | |||
---|---|---|---|---|
COW | MOR | COW | MOR | |
Spark | 支持 | 支持 | 支持 | 支持 |
Flink | 支持 | 支持 | 支持 | 支持 |
Presto | 支持 | 支持 | 不支持 | 不支持 |
Trino | 支持 | 支持 | 不支持 | 不支持 |
Hive | 支持 | 支持 | 不支持 | 不支持 |
可以通过Spark 3.x和Spark-2.4.3+对 Hudi 进行读写操作。我们更推荐使用 SparkSQL 的 SQL 语法来操作 Hudi,可以极大的简化 Hudi 的使用成本。使用的基本方法详见 Hudi 使用说明-基础使用。
本段主要介绍如何使用 Spark ThriftServer 配置连接 Hudi。
说明
目前只有EMR 2.x版本才支持Spark ThriftServer。
登录 EMR 控制台。
在左侧导航栏中,进入集群详情 > 服务列表 > Spark > 服务参数界面。
安装完 Hudi 后,可以到 sparkthriftserver 配置页面,找到 spark-defaults 中的 spark.sql.extensions 加上 org.apache.spark.sql.hudi.HoodieSparkSessionExtension (如果已有存在的值,用逗号隔开)
选择自定义配置,在 spark-thrift-sparkconf 添加新的选项 spark.serializer,写入值 org.apache.spark.serializer.KyroSerializer
对于 EMR 1.3 版本,需要额外增加一个配置,EMR 1.2 版本不需要该步骤
选项的key为:spark.sql.catalog.spark_catalog
选项的value为:org.apache.spark.sql.hudi.catalog.HoodieCatalog
单击确定按钮,完成参数配置。
单击右上角服务操作 > 重启按钮,重启 Spark 全部组件。
使用 beeline 连接 sparkthriftserver 用于测试, 参考 LDAP 文档和 Spark最佳实践 ,来配置用户名密码进行 sparkthriftserver 连接。
beeline -u jdbc:hive2://emr-30f8q2lxxxxxxxxxx-master-1:10000/default -n xxxx -p xxxxx
接着您即可使用标准的 SparkSQL 操作 Hudi 表。
对于已有的外表,我们也可通过 SparkSQL 将外表数据导入到 hudi 表中,下方是一个很小的 lineitem 表,将其保存为文本文件,上传。
文本样例内容如下表所示
1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the| 1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold | 1|63700|3701|3|8|13309.60|0.10|0.02|N|O|1996-01-29|1996-03-05|1996-01-31|TAKE BACK RETURN|REG AIR|riously. regular, express dep| 1|2132|4633|4|28|28955.64|0.09|0.06|N|O|1996-04-21|1996-03-30|1996-05-16|NONE|AIR|lites. fluffily even de| 1|24027|1534|5|24|22824.48|0.10|0.04|N|O|1996-03-30|1996-03-14|1996-04-01|NONE|FOB| pending foxes. slyly re| 1|15635|638|6|32|49620.16|0.07|0.02|N|O|1996-01-30|1996-02-07|1996-02-03|DELIVER IN PERSON|MAIL|arefully slyly ex| 2|106170|1191|1|38|44694.46|0.00|0.05|N|O|1997-01-28|1997-01-14|1997-02-02|TAKE BACK RETURN|RAIL|ven requests. deposits breach a| 3|4297|1798|1|45|54058.05|0.06|0.00|R|F|1994-02-02|1994-01-04|1994-02-23|NONE|AIR|ongside of the furiously brave acco| 3|19036|6540|2|49|46796.47|0.10|0.00|R|F|1993-11-09|1993-12-20|1993-11-24|TAKE BACK RETURN|RAIL| unusual accounts. eve| 3|128449|3474|3|27|39890.88|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |
通过 scp/或者其它方式上传数据集到集群
scp lineitem_small.tbl root@master_ip
登陆集群进行上传
ssh master_ip export HADOOP_USER_NAME=hive hadoop fs -mkdir /user/hive/lineitem hadoop fs -put lineitem_small.tbl /user/hive/lineitem/
使用 Beeline 等方式连接 SparkThriftServer,参考 1.2 使用方式。
使用 DDL 建表用于加载源文件
create external table lineitem ( l_orderkey int, l_partkey int, l_suppkey int, l_linenumber int, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string, l_shipinstruct string, l_shipmode string, l_comment string) row format delimited fields terminated by '|' stored as textfile location '/user/hive/lineitem';
创建 Hudi 表并使用 SparkSQL 导入外部数据到 hudi_lineitem 表中
create table hudi_lineitem using hudi tblproperties (type = 'cow', primaryKey = 'l_orderkey') select * from lineitem;
查询 Hudi 表,确认数据导入Hudi成功。
SELECT l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order FROM hudi_lineitem WHERE l_shipdate <= date '1998-12-01' - interval '90' day GROUP BY l_returnflag, l_linestatus ORDER BY l_returnflag, l_linestatus;
接着您就可以使用 Hudi 的特性对其中的数据进行增删改查操作。您可以参考 Hudi 使用说明中的语法,详见使用说明 。
EMR Presto 已与 Hudi 深度集成,所以您无需进行额外的配置,即可查询 Hudi 表数据。目前 Presto 支持 Hudi 表查询,不支持 Hudi 表写入。因此 Presto 需要在 Spark 端提前导入数据用于查询测试。
可以使用的presto cli连接测试,或者参考 presto 基础使用文档。
presto --user <username> --password --catalog hive
可以直接使用上文 SparkSQL 导入的样例表进行查询。
use default; show tables; SELECT l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order FROM hudi_lineitem WHERE l_shipdate <= date '1998-12-01' - interval '90' day GROUP BY l_returnflag, l_linestatus ORDER BY l_returnflag, l_linestatus;
Trino 和 Hudi 同为可选组件,需要在集群组件可选列表里面安装。 安装完 Hudi 后,只需要重启 Trino 全部组件,即可使 Trino 能够查询 Hudi 表。
连接 Trino 请参考 Trino 使用文档,配置 Trino 的 cli 连接字符串:
trino --user <username> --password --catalog hive
集成后即可安装标准的 Trino SQL 语法完整查询 COW 表。对于 MOR 表,Trino 支持有限,因此不推荐在 Trino 中使用 MOR 表。
use default; show tables; select * from hudi_cow_nonpcf_tbl;
如果是创建EMR集群后,才安装Hudi组件,则需要在成功安装Hudi后,重启Hive相关服务,否则不需要做额外配置。
使用 beeline 连接 hiveserver2 进行测试,在 -n -p 参数后指定对应的用户名密码,以及选择正确的hiveserver2地址。
beeline -u jdbc:hive2://master-1-1:10000/default -n xxx -p xxxx
查询 MOR 表时,需要先执行以下命令,来设置对应的参数,COW 表无需额外配置:
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
在 HiveServer2 中正常查询 Hudi 表即可。
select * from hudi_cow_nonpcf_tbl; select count(1) from hudi_cow_nonpcf_tbl;