本文将为您介绍如何通过调整内存、CPU 和 Task 个数等方式,实现 Hive 作业调优。
调优方向 | 调优方案 |
---|---|
代码优化 | 代码优化 |
参数调优 |
|
数据过滤之后再 JOIN。
重复使用数据时,避免重复计算,构建中间表,重复使用中间表。
读取表时分区过滤,避免全表扫描。
优化前代码
SELECT g, COUNT(DISTINCT CASE WHEN a > 1 THEN user_id) cnt_user1, COUNT(DISTINCT CASE WHEN a > 2 THEN user_id) cnt_user2, COUNT(DISTINCT CASE WHEN a > 3 THEN user_id) cnt_user3, COUNT(DISTINCT CASE WHEN a > 4 THEN user_id) cnt_user4 FROM tbl GROUP BY g
多个 distinct ,计算过程中,数据会膨胀。
优化后代码
SELECT g, SUM(CASE WHEN user1 > 0 THEN 1 END) as cnt_user1, SUM(CASE WHEN user2 > 0 THEN 1 END) as cnt_user2, SUM(CASE WHEN user3 > 0 THEN 1 END) as cnt_user3, SUM(CASE WHEN user4 > 0 THEN 1 END) as cnt_user4 FROM (SELECT g,user_id, COUNT(CASE WHEN a > 1 THEN user_id) user1, COUNT(CASE WHEN a > 2 THEN user_id) user2, COUNT(CASE WHEN a > 3 THEN user_id) user3, COUNT(CASE WHEN a > 4 THEN user_id) user4 FROM tbl GROUP BY g,user_id ) tmp GROUP BY g
通过两次 group by 的方式代替 distinct 操作,通过内层的 group by 去重并降低数据量,通过外层的 group by 取 sum,可实现 distinct 的效果。
数据倾斜一般出现在 group by 或 大表 join 时,某些 key 的数据量特别大,导致某些算子的计算量大大超过了其他算子。
group by 出现热点:
SET hive.map.aggr=true; SET hive.groupby.mapaggr.checkinterval=200000; (用于设定Map端进行聚合操作的条目数,视场景而定)
SET hive.groupby.skewindata=true;
当 hive.groupby.skewindata 设置为 true 时,生成的执行计划有两个 MapReduce 任务。
在第一个 MapReduce 中,Map 的输出结果集会随机分布到 Reduce 中,每个部分进行聚合操作,并输出结果。这样带来的效果是:相同的 Group By Key 可能分发到不同的 Reduce 中,达到负载均衡的目的。
第二个 MapReduce 任务会再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中,完成最终的聚合计算。
大表 join 出现热点:
策略是使热点 Key 随机化。
例如,event 表存在大量 user_id 为 null 的记录,但是表 users 中不会存在 user_id 为空的记录,可以把 null 随机化再 join,这样就避免了 null 值都分发到一个 Reduce Task 上。
SELECT * FROM event a LEFT OUTER JOIN users b ON CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) ELSE a.user_id=b.user_id END;
可以在 EMR 集群的 MapReduce2 服务下,通过设置以下参数,对 Map 和 Reduce 阶段的内存进行调优:
Map
参数 | 描述 | 示例 |
---|---|---|
mapreduce.map.java.opts | 默认参数,表示JVM堆内存。 | -Xmx2048m |
mapreduce.map.memory.mb | 默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256。 | 2304 |
Reduce
参数 | 描述 | 示例 |
---|---|---|
mapreduce.reduce.java.opts | 默认参数,表示JVM堆内存。 | -Xmx2048m |
mapreduce.reduce.memory.mb | 默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256。 | 2304 |
可以通过设置以下参数,对Map和Reduce任务的CPU进行调优。
参数 | 描述 |
---|---|
mapreduce.map.cpu.vcores | 每个Map任务可用的最多的CPU Core数目。 |
mapreduce.reduce.cpu.vcores | 每个Reduce任务可用的最多的CPU Core数目。说明 此设置在公平队列是不生效的,通常vCores用于较大的集群,以限制不同用户或应用程序的CPU。 |
参数 | 描述 |
---|---|
hive.vectorized.execution.enabled | 默认值为false。表示是否开启向量化查询的开关, 建议性能测试时修改参数值为true。 |
hive.vectorized.execution.reduce.enabled | 默认值为false。表示是否启用Reduce任务的向量化执行模式,建议性能测试时修改参数值为true。 |
在分布式计算中,原始数据是决定 Map 数量的一个重要因素。优化的策略有:
当原始数据文件数量特别多,并且每个文件特别小时,可以减少初始 Map 对应的 Task 数量,以减少计算资源浪费。
如果文件数量较少,但是单个文件较大,可以增加 Map 的 Task 数量,减少单个 Task 的压力。
决定 Map Task 数量的参数有:
mapred.map.tasks
mapred.min.split.size
dfs.block.size
初始默认的 Map Task 数量计算逻辑
default_mapper_num = total_size/dfs.block.size
如果 Hive 文件存储在 HDFS 上。因为 HDFS 的文件都是分块存储,因此具体的 Hive 数据文件在 HDFS 上分多少块,就可能对应默认的 Map Task 数量。
split_num 的计算逻辑
default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size)) split_num = total_size/default_split_size;
Map Task 数量的计算逻辑
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
如果要提高 Task 数量,就要降低 mappred.min.split.size 的值,在一定范围可以减小 default_split_size 的值,从而增加 split_num 的值,增大 mapred.map.tasks 的数量。
通过 hive.exec.reducers.bytes.per.reducer 参数控制单个 Reduce 处理的字节数
Reduce 的计算方法如下:
reducer_num = min( total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)。
通过 mapred.reduce.tasks 参数来设置 Reduce Task 的数量
过多的 Reduce 会增加启动时间,消耗集群资源。有多少个 Reduce ,就会有多少个输出文件,如果生成了很多小文件,这些小文件作为下一个任务的输入,就会出现小文件过多的问题。
大量小文件容易在文件存储端造成瓶颈,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来处理。
参数 | 描述 |
---|---|
hive.merge.mapfiles | 默认值为true。表示是否合并Map输出文件。 |
hive.merge.mapredfiles | 默认值为false。表示是否合并Reduce输出文件。 |
hive.merge.size.per.task | 默认值为256000000,单位字节。表示合并文件的大小。 |