You need to enable JavaScript to run this app.
导航
Spark开启动态伸缩功能时的最佳实践
最近更新时间:2024.12.30 15:27:02首次发布时间:2024.11.26 20:39:49

Dynamic Resource Allocation简介

Spark 提供了一种机制,可以根据工作负载动态调整应用程序占用的资源。这意味着,如果资源不再被使用,应用程序可以将资源归还给集群,并在有需求时再次请求这些资源。如果多个应用程序共享 Spark 集群中的资源,这个功能特别有用。此特性在Yarn 模式和 Kubernetes 模式下都可以使用。默认情况下,这个功能是禁用的。
可以通过下述配置项开启DRA(后续用DRA代表Dynamic Resource Allocation):

spark.dynamicAllocation.enabled=true

当启用动态资源分配时,[minExecutors, maxExecutors] 范围决定了引擎可以从集群管理器获取多少资源。
一方面,minExecutors 指定了 Spark 至少要保留多少个Executor。如果设置得太接近 0(默认值),当集群管理器非常繁忙且持续时间较长时,引擎可能会缺乏资源。然而,minExecutors 设置得越大,引擎在空闲时间内可能浪费的资源就越多。另一方面,maxExecutors 决定了引擎可以达到的Executor的上限。从单个引擎的角度来看,这个值越大越好,以便处理更重的查询。然而,从整个集群的资源角度来看,我们必须将其限制在一个合理的范围内。否则,一个大型查询可能会导致运行它的引擎从队列/命名空间中消耗过多资源,并占用这些资源相当长的时间,这对于高效使用资源来说可能会存在浪费。在这种情况下,我们更希望这样的巨大任务在有限的并发量下更慢地完成。
可以通过下述配置设置minExecutors和maxExecutors:

spark.dynamicAllocation.minExecutors=minExecutors
spark.dynamicAllocation.maxExecutors=maxExecutors
spark.dynamicAllocation.initialExecutors=initialExecutors

spark.dynamicAllocation.initialExecutors 可以用来决定在引擎启动期间请求多少个Spark executors。
它们之间的大小关系应该是 minExecutors <= initialExecutors < maxExecutors

资源分配策略介绍

开启DRA后,当DRA注意到当前资源不足以应对当前工作负载时,它将请求更多的Spark executor。

默认情况下,DRA会根据需要处理的任务数量请求足够的Executor,以最大化并行度。如下图所示:

虽然这可以最小化作业的延迟,但对于小任务来说,默认行为可能会因为Executor分配的开销而浪费许多资源,因为有些Executor可能根本没有工作。
在这种情况下,我们可以将 spark.dynamicAllocation.executorAllocationRatio 调低一些,以减少相对于完全并行度的Executor数量。例如,设置为 0.5,意思是将把目标Executor数量除以 2,如下图所示:

在完成一个任务后,Spark Driver 会为具有可用核心的Executor调度一个新任务。当待处理任务变得越来越少时,一些Executor由于没有新的任务到来而变得空闲。如果一个Executor达到了最大超时时间,它将被移除。相关超时配置如下所示:

spark.dynamicAllocation.executorIdleTimeout=60s

如果DRA发现有待处理任务积压超过了超时时间,将会请求新的Executor,这由以下两个配置项控制:

spark.dynamicAllocation.schedulerBacklogTimeout=1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=5s

实际的请求会在待处理任务积压超过**spark.dynamicAllocation.schedulerBacklogTimeout** 秒时触发,如果待处理任务的队列持续存在,则每隔 **spark.dynamicAllocation.sustainedSchedulerBacklogTimeout **秒再次触发。此外,每轮请求的Executor数量会以指数方式增加。例如,一个应用程序在第一轮会添加1个Executor,然后在后续轮次中分别添加2个、4个、8个,以此类推。
采用指数增加策略的动机有两个方面。首先,应用程序在开始时应该谨慎地请求Executor,以防发现只需要少量额外的Executor即可满足需求。这与TCP慢启动的原理相呼应。其次,如果发现实际上需要大量Executor,应用程序应该能够及时增加其资源使用量。

最佳实践

开启动态伸缩时,无论针对Yarn模式还是Kubernetes模式,优先推荐使用Remote shuffle service,否则开启动态伸缩会提示错误,spark关联celeborn相关配置如下所示:

spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager
#如果使用Yarn ESS时需要开启此参数
spark.shuffle.service.enabled=false 
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints=celeborn master的endpoints
spark.celeborn.client.spark.shuffle.writer=hash
spark.celeborn.client.push.replicate.enabled=true

关闭动态伸缩时,使用Local shuffle与Remote shuffle service都可以,如果要使用Local shuffle,改为下述配置即可:

#为空则会取默认ShuffleManager
spark.shuffle.manager=