You need to enable JavaScript to run this app.
导航
Hive 作业调优
最近更新时间:2024.05.08 17:05:42首次发布时间:2022.09.22 16:55:46

本文将为您介绍如何通过调整内存、CPU 和 Task 个数等方式,实现 Hive 作业调优。

1 调优方案总览

调优方向调优方案
代码优化代码优化

参数调优

  • 内存参数

  • CPU 参数

  • 开启向量化

  • Task 数量优化

  • 合并小文件

2 代码优化

2.1 数据清洗

  • 数据过滤之后再 JOIN。

  • 重复使用数据时,避免重复计算,构建中间表,重复使用中间表。

  • 读取表时分区过滤,避免全表扫描。

2.2 多 distinct 优化

  • 优化前代码

    SELECT g,
           COUNT(DISTINCT CASE WHEN a > 1 THEN user_id) cnt_user1,
           COUNT(DISTINCT CASE WHEN a > 2 THEN user_id) cnt_user2,
           COUNT(DISTINCT CASE WHEN a > 3 THEN user_id) cnt_user3,
           COUNT(DISTINCT CASE WHEN a > 4 THEN user_id) cnt_user4
    FROM tbl 
    GROUP BY g
    

    多个 distinct ,计算过程中,数据会膨胀。

  • 优化后代码

    SELECT g,
           SUM(CASE WHEN user1 > 0 THEN 1 END) as cnt_user1,
           SUM(CASE WHEN user2 > 0 THEN 1 END) as cnt_user2,
           SUM(CASE WHEN user3 > 0 THEN 1 END) as cnt_user3,
           SUM(CASE WHEN user4 > 0 THEN 1 END) as cnt_user4
    FROM 
            (SELECT g,user_id,
                    COUNT(CASE WHEN a > 1 THEN user_id) user1,
                    COUNT(CASE WHEN a > 2 THEN user_id) user2,
                    COUNT(CASE WHEN a > 3 THEN user_id) user3,
                    COUNT(CASE WHEN a > 4 THEN user_id) user4
            FROM tbl
            GROUP BY g,user_id  
            ) tmp 
    GROUP BY g
    

    通过两次 group by 的方式代替 distinct 操作,通过内层的 group by 去重并降低数据量,通过外层的 group by 取 sum,可实现 distinct 的效果。

2.3 数据倾斜

数据倾斜一般出现在 group by 或 大表 join 时,某些 key 的数据量特别大,导致某些算子的计算量大大超过了其他算子。

  • group by 出现热点:

    1. 先开启 Map 端聚合
    SET hive.map.aggr=true;
    SET hive.groupby.mapaggr.checkinterval=200000; (用于设定Map端进行聚合操作的条目数,视场景而定)
    
    1. 对 Key 随机化打散,多次聚合
    SET hive.groupby.skewindata=true;
    

    当 hive.groupby.skewindata 设置为 true 时,生成的执行计划有两个 MapReduce 任务。

    • 在第一个 MapReduce 中,Map 的输出结果集会随机分布到 Reduce 中,每个部分进行聚合操作,并输出结果。这样带来的效果是:相同的 Group By Key 可能分发到不同的 Reduce 中,达到负载均衡的目的。

    • 第二个 MapReduce 任务会再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中,完成最终的聚合计算。

  • 大表 join 出现热点:

    策略是使热点 Key 随机化。
    例如,event 表存在大量 user_id 为 null 的记录,但是表 users 中不会存在 user_id 为空的记录,可以把 null 随机化再 join,这样就避免了 null 值都分发到一个 Reduce Task 上。

    SELECT * FROM event a LEFT OUTER 
    JOIN users b ON 
        CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) 
        ELSE a.user_id=b.user_id 
        END;
    

3 参数调优

3.1 内存参数

可以在 EMR 集群的 MapReduce2 服务下,通过设置以下参数,对 Map 和 Reduce 阶段的内存进行调优:

  • Map

    参数描述示例
    mapreduce.map.java.opts默认参数,表示JVM堆内存。-Xmx2048m
    mapreduce.map.memory.mb默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256。2304
  • Reduce

    参数描述示例
    mapreduce.reduce.java.opts默认参数,表示JVM堆内存。-Xmx2048m
    mapreduce.reduce.memory.mb默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256。2304

3.2 CPU 参数

可以通过设置以下参数,对Map和Reduce任务的CPU进行调优。

参数描述
mapreduce.map.cpu.vcores每个Map任务可用的最多的CPU Core数目。
mapreduce.reduce.cpu.vcores每个Reduce任务可用的最多的CPU Core数目。说明 此设置在公平队列是不生效的,通常vCores用于较大的集群,以限制不同用户或应用程序的CPU。

3.3 开启向量化

参数描述
hive.vectorized.execution.enabled默认值为false。表示是否开启向量化查询的开关, 建议性能测试时修改参数值为true。
hive.vectorized.execution.reduce.enabled默认值为false。表示是否启用Reduce任务的向量化执行模式,建议性能测试时修改参数值为true。

3.4 Task 数量优化

3.4.1 Map Task 数量优化

在分布式计算中,原始数据是决定 Map 数量的一个重要因素。优化的策略有:

  • 当原始数据文件数量特别多,并且每个文件特别小时,可以减少初始 Map 对应的 Task 数量,以减少计算资源浪费。

  • 如果文件数量较少,但是单个文件较大,可以增加 Map 的 Task 数量,减少单个 Task 的压力。

决定 Map Task 数量的参数有:

  • mapred.map.tasks

  • mapred.min.split.size

  • dfs.block.size

  1. 初始默认的 Map Task 数量计算逻辑

    default_mapper_num = total_size/dfs.block.size
    

    如果 Hive 文件存储在 HDFS 上。因为 HDFS 的文件都是分块存储,因此具体的 Hive 数据文件在 HDFS 上分多少块,就可能对应默认的 Map Task 数量。

  2. split_num 的计算逻辑

    default_split_size = max(mapred.min.split.size, 
                         min(mapred.max.split.size, dfs.block.size))
    split_num = total_size/default_split_size;
    
  3. Map Task 数量的计算逻辑

    map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
    

    如果要提高 Task 数量,就要降低 mappred.min.split.size 的值,在一定范围可以减小 default_split_size 的值,从而增加 split_num 的值,增大 mapred.map.tasks 的数量。

3.4.2 Reduce Task 数量优化

  • 通过 hive.exec.reducers.bytes.per.reducer 参数控制单个 Reduce 处理的字节数

    Reduce 的计算方法如下:

    reducer_num = min(
        total_size/hive.exec.reducers.bytes.per.reducers, 
        hive.exec.reducers.max)。
    
  • 通过 mapred.reduce.tasks 参数来设置 Reduce Task 的数量

    过多的 Reduce 会增加启动时间,消耗集群资源。有多少个 Reduce ,就会有多少个输出文件,如果生成了很多小文件,这些小文件作为下一个任务的输入,就会出现小文件过多的问题。

3.5 合并小文件

大量小文件容易在文件存储端造成瓶颈,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来处理。

参数描述
hive.merge.mapfiles默认值为true。表示是否合并Map输出文件。
hive.merge.mapredfiles默认值为false。表示是否合并Reduce输出文件。
hive.merge.size.per.task默认值为256000000,单位字节。表示合并文件的大小。