You need to enable JavaScript to run this app.
导航
AutoScale:使用AdaptiveStreamingExecutor
最近更新时间:2024.08.14 12:03:59首次发布时间:2024.07.01 16:39:46

与StreamingExecutor相比,AdaptiveStreamingExecutor可以自适应地调整每个map算子的并发度,更加灵活高效。

配置

DataContext中配置

说明

默认值

enable_adaptive_execute

是否激活AdaptiveStreamingExecutor

False

task_backlog_timeout_secs

task处于等待调度状态的最长持续时间,超过此时间则触发scale up

1s

actor_kill_timeout_secs

actor闲置多久未使用回收,触发scale down

60s

使用实例

DataContext.get_current().enable_adaptive_execute = True
  • 默认配置为False,map算子翻译成ActorPoolMapOperator
InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> ActorPoolMapOperator[MapBatches(SDTransformer)] -> TaskPoolMapOperator[Write]
  • 使用AdaptiveStreamingExecutor,map算子翻译成AdaptiveMapOperator
InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> AdaptiveMapOperator[MapBatches(SDTransformer)] -> TaskPoolMapOperator[Write]

注意

在concurrency参数指定的范围,可动态调整map算子(不包括read和write)并发度,如concurrency=(10, 40),会使用10-40个actor进行任务调度与计算。

与Ray集群的AutoScale配合使用

Ray集群AutoScale可以根据负载情况自动弹缩物理节点,同时激活Ray Data侧和Ray集群侧两层AutoScale能力,可以更高效地使用资源。