You need to enable JavaScript to run this app.
导航
如何解决 Ray 集群的压力负载及内存溢出问题
最近更新时间:2024.08.21 20:07:40首次发布时间:2024.08.21 20:07:40

在分布式计算中,内存管理和负载均衡是两个关键问题。Ray 提供了一系列工具和方法来帮助开发者监控和优化内存使用,确保集群在高负载下仍能稳定运行。本文将介绍如何解决 Ray 集群的压力负载,以及如何定位和解决内存溢出的问题。

监控 Ray 集群的内存使用情况

通过EMR管控台集群监控查看

通过EMR管控平台提供Ray集群的监控,查看集群的资源使用情况:包括节点资源情况和Task的监控信息。
示例

使用 ray memory 命令

也可以采用ray memory命令查看内存使用情况的详细报告:

该命令会输出当前对象存储中的对象及其内存使用情况,帮助识别哪些对象占用了大量内存。

通过Ray日志查看

Ray 会在日志中记录内存使用情况。如果某个任务因为内存不足而失败,也可以在日志中找到相关信息。
日志文件通常位于 /tmp/ray/session_latest/logs 目录下或/var/log/emr/ray/session_latest/logs
也可以在Ray dashboard界面上查看日志。

解决内存溢出的思路

降低Head节点的负载

在 Ray 集群中,是有一个Head和多个worker节点组成,其中:Head 节点扮演着至关重要的角色。它不仅负责集群的管理和协调,还承担了许多关键功能:集群管理、任务调度、日志和监控等等。如果 Head 节点出现故障,整个集群的运行将受到严重影响。
因此在大规模集群中,建议将 Head 节点专用于集群管理任务,而不用于执行计算任务,即在Head节点上避免启动worker Process。通过在启动Ray Head的参数中添加“ num-cpus: '0'”,将会阻止具有非零 CPU 需求的 Ray 工作负载被调度到Head中。
示例
以RayJob的方式为例,下面对应的Yaml文件,在启动Head时配置“num-cpus: '0'”:

apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: rayjob-sample
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /$1 
spec:
  entrypoint: python /home/ray/samples/sample_code.py
      
  # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
  rayClusterSpec:
    rayVersion: '2.30.0' # should match the Ray version in the image of the containers
    # Ray Head pod template
    HeadGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
        num-cpus: '0'
      template:
        spec:
          containers:
            - name: ray-Head
              image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.30.0-py3.11-ubuntu20.04-207-1.5.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "5"
                  memory: 20Gi
                requests:
                  cpu: "5"
                  memory: 20Gi
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            - name: code-sample
              configMap:
                # Provide the name of the ConfigMap you want to mount.
                name: ray-job-code-sample
                # An array of keys from the ConfigMap to create as files
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      # the pod replicas in this group typed worker
      - replicas: 1
        minReplicas: 1
        maxReplicas: 5
        # logical group name, for this called small-group, also can be functional
        groupName: small-group
        # The `rayStartParams` are used to configure the `ray start` command.
        # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
        # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
        rayStartParams: {}
        #pod template
        template:
          spec:
            containers:
              - name: ray-worker
                image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.30.0-py3.11-ubuntu20.04-207-1.5.0
                lifecycle: 
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "5"
                    memory: 20Gi
                  requests:
                    cpu: "5"
                    memory: 20Gi
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
data:
  sample_code.py: |
    import ray
    import time
    def map1(n):
        time.sleep(1)
        return n

    def column_udf_class(col, udf):
        class UDFClass:
            def __call__(self, row):
                return {col: udf(row[col])}
        return UDFClass

    ray.data.range(20).map_batches(
            column_udf_class("id", map1),
            concurrency=(10),
            num_cpus = 0.5
        ).show()

该用例中,执行的作业脚本/home/ray/samples/sample_code.py中,启动10个task作业。在Ray的UI中可以看到这10个作业都在worker node上运行。
图片
图片

减少任务并发度

在高并发环境下,多个任务同时运行会占用大量的内存资源,可能导致内存不足问题。通过减少并发度,可以降低同时运行的任务数量,从而减少内存占用,避免 OOM 问题。

  • 配置num_cpus参数。
    通过增加ray.remote()中的 num_cpus 参数,可以控制任务的并发度。增加 CPU 要求可以减少同时运行的任务数量,从而降低内存使用。
    示例

    import ray
    
    ray.init()
    
    # 定义一个内存密集型任务,并增加 num_cpus 参数
    @ray.remote(num_cpus=2)  # 每个任务需要 2 个 CPU
    def memory_intensive_task():
        # 模拟内存密集型操作
        large_array = [0] * (10**8)
        return len(large_array)
    
    # 提交多个任务
    tasks = [memory_intensive_task.remote() for _ in range(20)]
    
    # 获取结果
    results = ray.get(tasks)
    
    print(results)
    
  • ray.data模块,有些api可以通过配置concurrency调整并发度。
    **示例 **
    ray.data.Dataset.map_batches

    import ray
    import time
    
    # 初始化 Ray
    ray.init()
    
    # 定义两个模拟的 UDF(用户定义函数),每个函数都会睡眠 1 秒钟
    def map1(n):
        time.sleep(1)
        return n
    
    def map2(n):
        time.sleep(1)
        return n
    
    # 定义一个类,用于将列名和 UDF 结合起来
    def column_udf_class(col, udf):
        class UDFClass:
            def __call__(self, row):
                return {col: udf(row[col])}
        return UDFClass
    
    # 创建一个包含 1000 个整数的 Ray Dataset
    dataset = ray.data.range(1000)
    
    # 使用 map_batches 处理数据集,并设置并发度
    processed_dataset = (
        dataset.map_batches(
            column_udf_class("id", map1),
            concurrency=2,  # 设置并发度为 2
        )
        .map_batches(
            column_udf_class("id", map2),
            concurrency=3,  # 设置并发度为 3
        )
    )
    
    # 显示处理后的数据集
    processed_dataset.show()
    

增加Ray节点的内存配置

在启动 Ray 集群时通过调大worker和Head的内存,,可以减少内存不足的风险。特别是在处理大规模数据集或内存密集型任务时,增加内存配置可以显著提高任务的成功率和集群的稳定性。

及时释放对象存储

在任务完成后,及时删除不再需要的对象,以减少对象存储的内存使用。可以使用 ray._private.internal_api.free 函数来手动删除不再需要的对象。
示例

import numpy as np
import ray

ray.init()

@ray.remote
def create_large_object():
    return np.zeros(1000000)

object_id = create_large_object.remote()

# 使用对象
result = ray.get(object_id)

# 删除对象
ray._private.internal_api.free([object_id])