You need to enable JavaScript to run this app.
导航
Spark Native使用说明
最近更新时间:2024.11.11 14:27:22首次发布时间:2024.05.13 21:35:20

Spark Native简介

Spark Native是火山引擎 E-MapReduce(EMR)团队自研的统一湖仓分析加速C++引擎,使用了向量化计算、Codegen等加速技术。Spark Native采用Gluten+Spark Native的方式来加速Spark。

Spark Native整体架构

Spark Native的整体架构如下:
Image

其核心包含两部分:

  • 执行计划的转换和传递。通过 Substrait 来实现物理执行计划在 JVM 和 Native 的传递,Substrait 可以理解为是基于 google protobuf 的面向关系对象的跨语言序列化 SDK,Spark Native 也支持基于 Substrait 作为执行计划输入
  • 数据的传递。SparkSQL 的执行是 Row-based,算子操作的对象是 Row,而在向量化框架里,算子操作的对象是 ColumnarBatch,一个 ColumnarBatch 可以包含多行数据,ColumnarBatch 里每个字段的数据结构是 Column Vector

执行计划转换

执行计划的转换发生在 物理执行计划 阶段。这意味着 Plan 相关的优化规则对于Spark Native依然生效。
Image

统一的内存管理

Spark通过Unified Memory Manager来管理内存,以及协调不同算子之间的Spill 和 内存释放。Spark Native也接入并复用了这个能力。Spark Native 提供了一个 Memory Pool 来管理 Native 算子在运行时对内存的申请和释放,并且通过注册Listerner的方式调用 JNI 回调给 JVM 的 Task Memory Consumer,从而接入 Spark的Unified Memory Manager。
对于一个没有算子Fallback的 Spark Native任务,它的 OnHeap 使用应该是非常少的,所有和数据有关的算子都是用的 OffHeap。这也降低了在大内存场景下 GC 导致的性能损失。
Image

Runtime

从Runtime的视角看Spark Native如下
Image
可以看到,在每个Spark Task粒度上,通过JNI调用Spark Native来执行单个Task的计算逻辑。我们依然可以复用Spark健壮的调度框架,推测执行,task/stage 不同粒度的失败重试,以及丰富的生态,比如 yarn/k8s等。

Spark Native使用

目前Spark Native可以通过增加配置的方式启用。

固定参数配置

# Spark Native
set spark.plugins=io.glutenproject.GlutenPlugin;
set spark.shuffle.manager=org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager;
set spark.gluten.sql.columnar.numaBinding=false;
set spark.gluten.sql.columnar.coreRange=0-35,72-107|36-71,108-143;
set spark.gluten.sql.columnar.shuffleSplitDefaultSize=8192;
set spark.gluten.sql.columnar.forceShuffledHashJoin=true;
set spark.sql.bucket.union.enable=false;
set spark.sql.optimizer.dynamicDataPruning.enabled=false;
set spark.sql.optimizer.dynamicDataPruning.datasource.enabled=false;
set spark.sql.mergeGetMapValue.enabled=false;
set spark.sql.collapseGetJsonObject.enabled=false;
set spark.sql.CollapseExpensiveExprMode=None;
set spark.sql.execution.useObjectHashAggregateExec=false;
set spark.sql.optimizer.partialAggregationOptimization.enabled=false;
set spark.sql.aggregate.adaptivePartialAggregationThreshold=0;
set spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=80;
set spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=100000;
set spark.gluten.sql.columnar.maxBatchSize=32768;
set spark.gluten.sql.columnar.maxBatchByteSize=41943040;
set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAggregates,org.apache.spark.sql.hive.auth.GeminiSparkAuthorization;
set spark.sql.gluten.customHadoopConfEnforce.enabled=false;
set spark.kubernetes.driverEnv.CPP_HDFS_CONF=/opt/tiger/hadoop_conf/core-site.xml;
set spark.executorEnv.CPP_HDFS_CONF=/opt/tiger/hadoop_conf/core-site.xml;
set spark.kubernetes.driverEnv.LD_PRELOAD=/opt/tiger/spark_deploy/spark-stable/native_lib/libjemalloc.so;
set spark.executorEnv.LD_PRELOAD=/opt/tiger/spark_deploy/spark-stable/native_lib/libjemalloc.so;
set spark.kubernetes.driverEnv.HADOOP_CONF_DIR=/opt/tiger/hadoop_conf;
set spark.executorEnv.HADOOP_CONF_DIR=/opt/tiger/hadoop_conf;

内存配置

Spark Native计算使用非堆内存,所以在启用Spark Native的时候对应要修改Spark内存使用,默认情况下我们会配置以下2个参数:

// 堆内内存
spark.executor.memory 12G
// 预留非堆内存
spark.executor.memoryOverhead 4096

启用Spark Native后,建议设置参数如下:

// 堆内内存,建议保留原来堆内内存的25%
set spark.executor.memory=3G;
// 预留非堆内存,建议不变
set spark.executor.memoryOverhead=4096;

// 启用offHeap内存
set spark.memory.offHeap.enabled=true;
// offHeap内存大小,建议设置为原来堆内内存的75%
set spark.memory.offHeap.size=9G;

//注:启用Spark Native后
spark.executor.memory+spark.memory.offHeap.size=原spark.executor.memory

注意

由于SQL专用资源队列会预热拉起,不支持重新配置内存,如果需要更改内存配置,请使用公共队列或者具有通用资源的队列,并设置如下参数:
set tqs.query.engine.type = sparkcli;
如果的确希望在SQL专用资源更改内存配置,请提 工单联系我们。