You need to enable JavaScript to run this app.
导航
3.1.4 Spark任务
最近更新时间:2022.09.05 11:25:29首次发布时间:2022.09.05 11:25:29
我的收藏
有用
有用
无用
无用

使用场景

spark任务,具体分为Jar spark任务和python spark任务。

Spark 使用说明

  1. Spark insert overwrite 使用示例
    df.write.insertInto("aaa\\\_bbb\\\_doc\\\_html\\\_detail", True)
  2. 用户自定义参数较多时,且存在K-V参数时,可参考如下配置
    Spark程序中,如果自定义参数较多, 可直接在"自定义参数"输入框中配置,使用空格分隔,数据开发运行时,会将自定义参数拼接到main class之后,配置输入参数格式需符合spark main class程序解析方式。
    如下图
    alt
    如需使用时间变量,也可以直接在自定义参数中配置${date}/${DATE}等系统变量。
  3. Spark优化参数
    参数列表
参数描述默认值
spark.executor.instances静态资源下:executor数2
spark.executor.cores每个executor和CPU数4
spark.dynamicAllocation.enabled动态资源开关false
spark.dynamicAllocation.maxExecutors动态资源下:executor的最大个数500
spark.executor.memory每个executor的内存大小8g
spark.memory.fractionexecutor用于计算的内存比例,剩余部分用于存储元数据和运行信息。对于executor内存开的较大的任务,可以适当提高这个值,让更多内存参与计算,但会增加OOM风险0.6

spark.executor.memoryOverhead/
spark.yarn.executor.memoryOverhead

每个executor的堆外内存大小,堆外内存主要用于数据IO,对于报堆外OOM的任务要适当调大,单位Mb,与之配合要调大executor JVM参数,例如:
set spark.executor.memoryOverhead=3072
set spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=2560m

6144

spark.sql.adaptive.enabledAdaptive execution开关,包含自动调整并行度,解决数据倾斜等优化true
spark.sql.adaptive.minNumPostShufflePartitionsAE相关,动态最小的并行度1
spark.sql.adaptive.maxNumPostShufflePartitionsAE相关,动态最大的并行度,对于shuffle量大的任务适当增大可以减少每个task的数据量,如10241000
spark.sql.adaptive.join.enabledAE相关,开启后能够根据数据量自动判断能否将sortMergeJoin转换成broadcast jointrue
spark.sql.adaptiveBroadcastJoinThresholdAE相关,spark.sql.adaptive.join.enabled设置为true后会判断join的数据量是否小于该参数值,如果小于则能将sortMergeJoin转换成broadcast joinspark.sql.autoBroadcastJoinThreshold
spark.sql.adaptive.skewedJoin.enabledAE相关,开启后能够自动处理join时的数据倾斜,对于数据量明显高于中位数的task拆分成多个小taskfalse
spark.sql.adaptive.skewedPartitionFactorAE相关,数据倾斜判定标准,当同一stage的某个task数据量超过中位数的N倍,将会判定为数据倾斜5
spark.sql.adaptive.skewedPartitionMaxSplitsAE相关,被判定为数据倾斜后最多会被拆分成的份数5
spark.shuffle.accurateBlockThresholdAE相关,数据倾斜判定基于shuffle数据量统计,如果统计所有的block数据,消耗内存较大,因此设有阈值,当shuffle的单个数据块超过大小和行数阈值时,才会进入统计,这个参数即大小阈值100*1024*1024(100MB)
spark.shuffle.accurateBlockRecordThresholdAE相关,同上,行数阈值,如果设置了上面的数据倾斜处理开关,仍然倾斜,可能是因为这几个参数设得偏大,适当缩小2 * 1024 * 1024
spark.sql.files.maxPartitionBytes默认一个task处理的数据大小,如果给的太小会造成最终任务task太多,太大会是输入环节计算较慢1073741824
spark.vcore.boost.ratiovcore,虚拟核数,设置大于1的数可以使一个核分配多个task,对于简单sql可以提升CPU利用率,对于复杂任务有OOM风险1
spark.shuffle.hdfs.enabled(长任务推荐)HDFS based Spark Shuffle开关,可以提高任务容错性。遇到org.apache.spark.shuffle.FetchFailedException报错需设置false

set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;

一般在开启hdfs shuffle后还可以开启这两个参数,避免不必要的重试和等待

spark.sql.crossJoin.enabled对于会产生笛卡尔积的sql,默认配置是限制不能跑的,在hive里可以配置set hive.mapred.mode=nonstrict跳过限制,相对应的在spark里可以配置set spark.sql.crossJoin.enabled=true起到同样的效果。false
spark.sql.broadcastTimeoutbroadcast joins时,广播数据最长等待时间,网络不稳定时,容易出现超时造成任务失败,可适当增大此参数。300(单位:s)
spark.sql.autoBroadcastJoinThreshold表能够使用broadcast join的最大阈值10MB
spark.network.timeout网络连接超时参数120s
spark.maxRemoteBlockSizeFetchToMemreduce端获取的remote block存放到内存的阈值,超过该阈值后数据会写磁盘,当出现数据量比较大的block时,建议调小该参数(比如512MB)。Long.MaxValue
spark.reducer.maxSizeInFlight控制从一个worker拉数据缓存的最大值48m
spark.merge.files.enabled合并输出文件,如果insert结果的输出文件数很多,希望合并,可以设为true,会多增加一个repartition stage合并文件,repartition的分区数由spark.merge.files.number控制false
spark.merge.files.number控制合并输出文件的输出数量512
spark.speculation推测执行开关。如果是原生任务很有可能没开这个参数,会出现个别task拖慢整个任务,可以开启这个参数。true
spark.speculation.multiplier开启推测执行的时间倍数阈值:当某个任务运行时间/中位数时间大于该值,触发推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。1.5
spark.speculation.quantile同一个stage中的task超过这个参数比例的task完成后,才会开启推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。0.75
spark.default.parallelismSpark Core默认并发度,原生spark程序并发度设置200
spark.sql.shuffle.partitionsSpark SQL默认并发度,AE开启后被spark.sql.adaptive.maxNumPostShufflePartitions取代200
spark.sql.sources.bucketing.enabled分桶表相关,当设置为false,会将分桶表当作普通表来处理。做为普通表会忽略分桶特性,部分情况性能会下降。但如果分桶表没有被正确生成(即表定义是分桶表,但数据未按分桶表生成)会报错RuntimeException: Invalid bucket file,避免这个错误,要将这个参数设为falsetrue
spark.sql.partition.rownum.collect.enable统计生成固定分区表行数false
spark.sql.dynamic.partition.rownum.collect.enable统计生成动态分区表行数false