Bolt 是火山引擎 E-MapReduce(EMR)团队自研的Spark on Native引擎实现方案,采用Gluten+Spark Native的方式来加速Spark。
Bolt 的整体架构如下:
其核心包含两部分:
执行计划的转换发生在 物理执行计划 阶段。这意味着 Plan 相关的优化规则对于Spark Native依然生效。
Spark通过Unified Memory Manager来管理内存,以及协调不同算子之间的Spill 和 内存释放。Spark Native也接入并复用了这个能力。Spark Native 提供了一个 Memory Pool 来管理 Native 算子在运行时对内存的申请和释放,并且通过注册Listerner的方式调用 JNI 回调给 JVM 的 Task Memory Consumer,从而接入 Spark的Unified Memory Manager。
对于一个没有算子Fallback的 Spark Native任务,它的 OnHeap 使用应该是非常少的,所有和数据有关的算子都是用的 OffHeap。这也降低了在大内存场景下 GC 导致的性能损失。
从Runtime的视角看Spark Native如下
可以看到,在每个Spark Task粒度上,通过JNI调用Spark Native来执行单个Task的计算逻辑。我们依然可以复用Spark健壮的调度框架,推测执行,task/stage 不同粒度的失败重试,以及丰富的生态,比如 yarn/k8s等。
目前Spark on Blot可以通过增加配置的方式启用。
--Spark Native set spark.plugins=io.glutenproject.GlutenPlugin; set spark.shuffle.manager=org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager; set spark.gluten.sql.columnar.numaBinding=false; set spark.gluten.sql.columnar.coreRange=0-35,72-107|36-71,108-143; set spark.gluten.sql.columnar.shuffleSplitDefaultSize=8192; set spark.gluten.sql.columnar.forceShuffledHashJoin=true; set spark.sql.bucket.union.enable=false; set spark.sql.optimizer.dynamicDataPruning.enabled=false; set spark.sql.optimizer.dynamicDataPruning.datasource.enabled=false; set spark.sql.mergeGetMapValue.enabled=false; set spark.sql.collapseGetJsonObject.enabled=false; set spark.sql.CollapseExpensiveExprMode=None; set spark.sql.execution.useObjectHashAggregateExec=false; set spark.sql.optimizer.partialAggregationOptimization.enabled=false; set spark.sql.aggregate.adaptivePartialAggregationThreshold=0; set spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=80; set spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=100000; set spark.gluten.sql.columnar.maxBatchSize=32768; set spark.gluten.sql.columnar.maxBatchByteSize=41943040; set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAggregates,org.apache.spark.sql.hive.auth.GeminiSparkAuthorization; set spark.sql.gluten.customHadoopConfEnforce.enabled=false; set spark.kubernetes.driverEnv.CPP_HDFS_CONF=/opt/tiger/hadoop_conf/core-site.xml; set spark.executorEnv.CPP_HDFS_CONF=/opt/tiger/hadoop_conf/core-site.xml; set spark.kubernetes.driverEnv.LD_PRELOAD=/opt/tiger/spark_deploy/spark-stable/native_lib/libjemalloc.so; set spark.executorEnv.LD_PRELOAD=/opt/tiger/spark_deploy/spark-stable/native_lib/libjemalloc.so; set spark.kubernetes.driverEnv.HADOOP_CONF_DIR=/opt/tiger/hadoop_conf; set spark.executorEnv.HADOOP_CONF_DIR=/opt/tiger/hadoop_conf;
Spark on Blot计算使用非堆内存,所以在启用Spark on Blot的时候对应要修改Spark内存使用,默认情况下我们会配置以下2个参数:
-- 堆内内存 spark.executor.memory 12G -- 预留非堆内存 spark.executor.memoryOverhead 4096
启用Spark on Blot后,建议设置参数如下:
--堆内内存,建议保留原来堆内内存的25% set spark.executor.memory=3G; --预留非堆内存,建议不变 set spark.executor.memoryOverhead=4096; --启用offHeap内存 set spark.memory.offHeap.enabled=true; --offHeap内存大小,建议设置为原来堆内内存的75% set spark.memory.offHeap.size=9G; --注:启用Spark Native后 spark.executor.memory+spark.memory.offHeap.size=原spark.executor.memory
注意
由于SQL专用资源队列会预热拉起,不支持重新配置内存,如果需要更改内存配置,请使用公共队列或者具有通用资源的队列,并设置如下参数:
set tqs.query.engine.type = sparkcli;
如果的确希望在SQL专用资源更改内存配置,请提 工单联系我们。
自软件栈3.20.0版本开始,Spark on Bolt新增支持以下算子与函数。
TABLESAMPLE(x ROWS)的查询方式。set emr.serverless.bolt.enabled=true; drop table if exists lt_testdb.test_table; create table lt_testdb.test_table AS select 1 as id, 'aaa' as name union all select 2 as id, 'bbb' as name union ALL select 3 as id, 'ccc' as name; SELECT * FROM lt_testdb.test_table TABLESAMPLE (1 ROWS);
set emr.serverless.bolt.enabled=true; set spark.gluten.sql.columnar.tableCache=true; drop table if exists lt_testdb.test_table; create table lt_testdb.test_table AS select 1 as id, 'aaa' as name union all select 2 as id, 'bbb' as name union ALL select 3 as id, 'ccc' as name; CACHE TABLE testCache OPTIONS ('storageLevel' 'MEMORY_ONLY') SELECT * FROM lt_testdb.test_table;
--conf spark.gluten.sql.columnar.tableCache=true,才能使用InMemoryTableScan。df = spark.sql("select 1234") df.cache() a = df.count() print(a) b = df.count() print(b)
create table lt_test.test_table AS select 1 as id, 'https://li.feishu.cn/sheets/<encryptToken>a6c89a17b90d01324d84f0eb81c83683467db385?sheet=a0eNWX' as url; select id , parse_url(url,'PATH') from lt_test.test_table;
drop table if exists lt_test.test_table; create table lt_test.test_table AS select 1 as id, 'SparkSQL' AS str, '537061726b2053514c' as base64_str select id , base64(str), base64(base64_str), unbase64('U3BhcmsgU1FM') from lt_test.test_table;
drop table if exists lt_test.test_table; create table lt_test.test_table AS select 1 as id, array(5, 6, 1) as arr1, array('b', 'd', null, 'c', 'a') as arr2 select id , array_sort(arr1, (left, right) -> case when left < right then -1 when left > right then 1 else 0 end), array_sort(arr2) from lt_test.test_table; select id , array_sort(arr1, (x, y) -> if(x > y, 1, if(x < y, -1, 0))), array_sort(arr2) from lt_testdb.test_table;
drop table if exists lt_test.test_table; create table lt_test.test_table AS select 1 as id, array(5, 6, 1) as arr1, array(3, 4) as arr2; select id , flatten(array(arr1, arr2)) from lt_test.test_table;