与StreamingExecutor相比,AdaptiveStreamingExecutor可以自适应地调整每个map算子的并发度,更加灵活高效。
DataContext中配置 | 说明 | 默认值 |
---|---|---|
enable_adaptive_execute | 是否激活AdaptiveStreamingExecutor | False |
task_backlog_timeout_secs | task处于等待调度状态的最长持续时间,超过此时间则触发scale up | 1s |
actor_kill_timeout_secs | actor闲置多久未使用回收,触发scale down | 60s |
DataContext.get_current().enable_adaptive_execute = True
ActorPoolMapOperator
InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> ActorPoolMapOperator[MapBatches(SDTransformer)] -> TaskPoolMapOperator[Write]
AdaptiveMapOperator
InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> AdaptiveMapOperator[MapBatches(SDTransformer)] -> TaskPoolMapOperator[Write]
注意
在concurrency参数指定的范围,可动态调整map算子(不包括read和write)并发度,如concurrency=(10, 40)
,会使用10-40个actor进行任务调度与计算。
Ray集群AutoScale可以根据负载情况自动弹缩物理节点,同时激活Ray Data侧和Ray集群侧两层AutoScale能力,可以更高效地使用资源。