在分布式计算中,内存管理和负载均衡是两个关键问题。Ray 提供了一系列工具和方法来帮助开发者监控和优化内存使用,确保集群在高负载下仍能稳定运行。本文将介绍如何解决 Ray 集群的压力负载,以及如何定位和解决内存溢出的问题。
通过EMR管控平台提供Ray集群的监控,查看集群的资源使用情况:包括节点资源情况和Task的监控信息。
示例
ray memory
命令也可以采用ray memory
命令查看内存使用情况的详细报告:
该命令会输出当前对象存储中的对象及其内存使用情况,帮助识别哪些对象占用了大量内存。
Ray 会在日志中记录内存使用情况。如果某个任务因为内存不足而失败,也可以在日志中找到相关信息。
日志文件通常位于 /tmp/ray/session_latest/logs
目录下或/var/log/emr/ray/session_latest/logs
。
也可以在Ray dashboard界面上查看日志。
在 Ray 集群中,是有一个Head和多个worker节点组成,其中:Head 节点扮演着至关重要的角色。它不仅负责集群的管理和协调,还承担了许多关键功能:集群管理、任务调度、日志和监控等等。如果 Head 节点出现故障,整个集群的运行将受到严重影响。
因此在大规模集群中,建议将 Head 节点专用于集群管理任务,而不用于执行计算任务,即在Head节点上避免启动worker Process。通过在启动Ray Head的参数中添加“ num-cpus: '0'
”,将会阻止具有非零 CPU 需求的 Ray 工作负载被调度到Head中。
示例
以RayJob的方式为例,下面对应的Yaml文件,在启动Head时配置“num-cpus: '0'
”:
apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-sample annotations: nginx.ingress.kubernetes.io/rewrite-target: /$1 spec: entrypoint: python /home/ray/samples/sample_code.py # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.30.0' # should match the Ray version in the image of the containers # Ray Head pod template HeadGroupSpec: rayStartParams: dashboard-host: '0.0.0.0' num-cpus: '0' template: spec: containers: - name: ray-Head image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.30.0-py3.11-ubuntu20.04-207-1.5.0 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "5" memory: 20Gi requests: cpu: "5" memory: 20Gi volumeMounts: - mountPath: /home/ray/samples name: code-sample volumes: - name: code-sample configMap: # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 1 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: spec: containers: - name: ray-worker image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.30.0-py3.11-ubuntu20.04-207-1.5.0 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "5" memory: 20Gi requests: cpu: "5" memory: 20Gi --- apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: | import ray import time def map1(n): time.sleep(1) return n def column_udf_class(col, udf): class UDFClass: def __call__(self, row): return {col: udf(row[col])} return UDFClass ray.data.range(20).map_batches( column_udf_class("id", map1), concurrency=(10), num_cpus = 0.5 ).show()
该用例中,执行的作业脚本/home/ray/samples/sample_code.py
中,启动10个task作业。在Ray的UI中可以看到这10个作业都在worker node上运行。
在高并发环境下,多个任务同时运行会占用大量的内存资源,可能导致内存不足问题。通过减少并发度,可以降低同时运行的任务数量,从而减少内存占用,避免 OOM 问题。
配置num_cpus
参数。
通过增加ray.remote()中的 num_cpus
参数,可以控制任务的并发度。增加 CPU 要求可以减少同时运行的任务数量,从而降低内存使用。
示例
import ray ray.init() # 定义一个内存密集型任务,并增加 num_cpus 参数 @ray.remote(num_cpus=2) # 每个任务需要 2 个 CPU def memory_intensive_task(): # 模拟内存密集型操作 large_array = [0] * (10**8) return len(large_array) # 提交多个任务 tasks = [memory_intensive_task.remote() for _ in range(20)] # 获取结果 results = ray.get(tasks) print(results)
ray.data模块,有些api可以通过配置concurrency调整并发度。
**示例 **
ray.data.Dataset.map_batches
import ray import time # 初始化 Ray ray.init() # 定义两个模拟的 UDF(用户定义函数),每个函数都会睡眠 1 秒钟 def map1(n): time.sleep(1) return n def map2(n): time.sleep(1) return n # 定义一个类,用于将列名和 UDF 结合起来 def column_udf_class(col, udf): class UDFClass: def __call__(self, row): return {col: udf(row[col])} return UDFClass # 创建一个包含 1000 个整数的 Ray Dataset dataset = ray.data.range(1000) # 使用 map_batches 处理数据集,并设置并发度 processed_dataset = ( dataset.map_batches( column_udf_class("id", map1), concurrency=2, # 设置并发度为 2 ) .map_batches( column_udf_class("id", map2), concurrency=3, # 设置并发度为 3 ) ) # 显示处理后的数据集 processed_dataset.show()
在启动 Ray 集群时通过调大worker和Head的内存,,可以减少内存不足的风险。特别是在处理大规模数据集或内存密集型任务时,增加内存配置可以显著提高任务的成功率和集群的稳定性。
在任务完成后,及时删除不再需要的对象,以减少对象存储的内存使用。可以使用 ray._private.internal_api.free
函数来手动删除不再需要的对象。
示例
import numpy as np import ray ray.init() @ray.remote def create_large_object(): return np.zeros(1000000) object_id = create_large_object.remote() # 使用对象 result = ray.get(object_id) # 删除对象 ray._private.internal_api.free([object_id])