You need to enable JavaScript to run this app.
导航
Spark 调优指南
最近更新时间:2024.07.11 11:05:10首次发布时间:2024.07.11 11:05:10

1 cpu 和 mem 配置

根据任务需求和硬件资源,合理配置 Spark 的 executor 数量(设置--num-executors)和 executor 配置(配置--executor-cores--executor-memory)。
对于 cpu 密集型任务(多见于数据科学场景下),总的 task 并行度应该设置为总的核数(或总核数 -1 以防止系统无法响应其他任务)
对于 mem 密集型任务(多数 spark sql 作业如此),应当根据 Yarn 可分配内存合理配置单个 executor 的内存大小。注意,--executor-memory 指的是 jvm 堆内存,其实际使用的内存要视情况而定:

  1. 如果仅仅是 scala/java 作业,或者 pyspark 的 python 算子部分没有额外的内存消耗,总内存使用大约是堆内存的 1.1 倍(请参考官网对于spark.executor.memoryOverhead 相关介绍)
  2. 如果是 pyspark 作业,python 算子内有数据结构,这部分内存是不被 spark 管理的。这个时候要合理估计 python 的内存使用

2 并行度配置

主要涉及两个参数

  1. spark.default.parallelism:影响 spark rdd 作业的并行度
  2. spark.sql.shuffle.partitions:影响 spark sql/streaming 作业的并行度

注意,上述参数仅仅是对 spark 的一个参考,并不一定保证真正的 task 数量等于设置的并行度

3 数据倾斜处理:

当数据中存在数据倾斜时,可以使用以下方法来处理:

  • 使用合适的数据分区策略,将数据均匀分布到各个节点上。
  • 使用 Spark 提供的解决数据倾斜的机制,如repartition()groupBy()的优化等。
  • 如果是 SQL 作业,可以通过增加 salt 属性来优化倾斜
  • 打开 Adaptive Query Execution(AQE,3.5.1 是默认开启的)

4 缓存与持久化

Spark 是一个 lazy execution 的机制。当对一个 dataframe 执行的 transformation 操作过长时,整个 dag 的链路会很长,而其中有一些部分可能是可以重复使用的,这样会有两个问题:

  1. 可以重复使用的算子完全可以避免重复计算
  2. 当 dag 很复杂时,spark 优化器的优化效果会打折扣,自动的算子重用可能会失效

在这种情况下,通常有如下优化手段:

  1. 当内存容量足够时,可以使用cache() 算子将计算结果临时缓存在内存
  2. 当内存有限时,可以使用 persist() 算子将计算结果持久化
  3. 使用临时表将原有的比较复杂的 dag/pipeline 拆分,即将结果写入临时表,这样一方面能获得更加稳定的表现,二能够在作业出错的时候方便排查问题

5 JVM 配置

通过以下两个参数为 driver 和 executor JVM 配置合理的参数项,比如设置 GC 算法,GC 打印等等

spark.driver.extraJavaOptions
spark.executor.extraJavaOptions

6 Remove Shuffle Service(RSS)

Remote shuffle service 相比 Spark 原生 shuffle 有一系列的优点,包括

  1. 提高性能:RSS 使用 push based 的 shuffle,能在计算的过程中就把数据存到远端,并且远端的数据存储是 partition 连续的,有效的避免了大量的随机读,提高 shuffle 效率
  2. 提高作业执行的稳定性:在传统的shuffle过程中,如果某个节点出现故障,可能会导致整个任务失败。而使用 RSS,即使某个节点出现故障,其他节点仍然可以继续处理数据,从而提高了作业的稳定性。另外,push bashed shuffle 能减小 executor 端的内存压力,进一步提升作业的稳定性
  3. 减少网络传输压力:在传统的shuffle过程中,数据需要在节点之间传输最高是 M * N,使用 RSS 将这个复杂度降低到了 M+N,从而减轻了网络传输的压力。
  4. 支持大规模数据处理:由于 RSS 可以将数据在不同的节点之间进行分发,因此它可以支持大规模的数据处理任务,这是传统的shuffle方式无法做到的。