Celeborn 介绍
Celeborn 数据链路
- Mappers lazily ask LifecycleManager to registerShuffle.
- LifecycleManager requests slots to Master.
- Workers reserve slots from LifecycleManager request and create corresponding files.
- Mappers get worker locations from LifecycleManager.
- Mappers push data to specified workers.
- Workers merge and replicate data to its peer.
- Workers flush to disk periodically.
- Mapper tasks accomplish and trigger MapperEnd event.
- When all mapper tasks are complete, workers commit files.
- Reducers ask for file locations.
- Reducers read shuffle data.
极端Partition/Task瓶颈分析
Celeborn Shuffle整体服务采用Master-Worker架构,Master负责Partition和IO调度,Worker负责承载来自Spark的IO。经过极限的性能优化,Celeborn能够提供百万级分区秒级的调度性能;但是系统内核和物理硬件等硬性约束导致针对超大Partition或超多的Task的场景,我们需要更多的Celeborn Worker和更大的Driver内存,具体而言包括以下几方面:
- 物理机承载的fd存在上限(约100w左右),每一个Partition和Running Task各需要消耗一个fd
- 集群可用磁盘容量,Celeborn Master会动态评估estimatePartitionSize,集群的可用磁盘容量至少应该大于estimatePartitionSize * PartitionCount
- 磁盘IO能力上限,来自任务的IO压力和数据会被均匀的分布到Celeborn的Worker
百万级分区&任务 参数建议
Celeborn集群容量建议
- 集群磁盘总容量: 【分区总数量(考虑并发任务&并发stage等因素)】【平均分区大小】【副本数】
- 集群节点个数: (【分区总数量(考虑并发任务&并发stage等因素)】【副本数】+ 2 * 【并发Task数量(考虑并发任务&并发stage等因素)】 集群节点个数) / 100w
Celeborn master 参数建议
celeborn.master.estimatedPartitionSize.minSize=512kb #分区平均Size的最小值
celeborn.master.estimatedPartitionSize.initialSize=512kb #分区平均Size的初始值
celeborn.master.estimatedPartitionSize.update.initialDelay=5min #自动估算分区平均Size的时间
celeborn.master.estimatedPartitionSize.update.interval=10min #自动估算分区平均Size的间隔
celeborn.master.heartbeat.worker.timeout=300s #Worker汇报心跳的超时时间
celeborn.master.fast.slot.assign.enabled = true #针对超大分区使用快速分配的开关
celeborn.master.large.request.threshold = 300000 # 超大分区的判断阈值
Spark Driver 参数建议
- Celeborn 参数:
- spark.celeborn.rpc.askTimeout=600s
- spark.celeborn.masterClient.rpc.askTimeout = 1000s
- spark.celeborn.client.rpc.registerShuffle.askTimeout = 1000s
- spark.celeborn.client.push.replicate.enabled = false【关闭副本,关闭副本后对Celeborn集群的压力会减半,但是会增加集群单点故障对任务的影响】
- celeborn.client.spark.shuffle.fallback.enabled = false
- 资源参数:
- Core: > 10 核
- Memory: 23.34 GB / 100w Task