Ray on VKE场景下,可以通过采用RayJob方式提交Ray作业。使用RayJob时,KubeRay会自动创建一个RayCluster,并在集群就绪时提交Ray作业。同时也支持在Ray作业结束后自动删除RayCluster。本章节介绍下各种场景下RayJob的yaml文件版本和使用案例。
下述yaml为简单测试场景Demo示例,其中Ray作业代码通过ConfigMap方式挂载到RayCluster中,适合于简单代码测试场景。
apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-sample spec: entrypoint: python /home/ray/samples/sample_code.py # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. shutdownAfterJobFinishes: true # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. ttlSecondsAfterFinished: 300 runtimeEnvYAML: | env_vars: counter_name: "test_counter" # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.9.3' # should match the Ray version in the image of the containers # Ray head pod template headGroupSpec: rayStartParams: dashboard-host: '0.0.0.0' template: spec: containers: - name: ray-head image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" requests: cpu: "200m" volumeMounts: - mountPath: /home/ray/samples name: code-sample volumes: - name: code-sample configMap: # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 1 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: spec: containers: - name: ray-worker image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "1" requests: cpu: "200m" --- apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: | import ray import os import requests ray.init() @ray.remote class Counter: def __init__(self): self.name = os.getenv("counter_name") assert self.name == "test_counter" self.counter = 0 def inc(self): self.counter += 1 def get_counter(self): return "{} got {}".format(self.name, self.counter) counter = Counter.remote() for _ in range(5): ray.get(counter.inc.remote()) print(ray.get(counter.get_counter.remote())) assert requests.__version__ == "2.31.0"
可以通过查看作业pod日志观看上述job执行情况:
INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-xg4mb-head-svc.test1.svc.cluster.local:8265 SUCC cli.py:60 -- ------------------------------------------------ SUCC cli.py:61 -- Job 'rayjob-sample-2m42f' submitted successfully SUCC cli.py:62 -- ------------------------------------------------ INFO cli.py:285 -- Next steps INFO cli.py:286 -- Query the logs of the job: INFO cli.py:288 -- ray job logs rayjob-sample-2m42f INFO cli.py:290 -- Query the status of the job: INFO cli.py:292 -- ray job status rayjob-sample-2m42f INFO cli.py:294 -- Request the job to be stopped: INFO cli.py:296 -- ray job stop rayjob-sample-2m42f INFO cli.py:303 -- Tailing logs until the job exits (disable with --no-wait): INFO worker.py:1405 -- Using address 192.168.6.48:6379 set in the environment variable RAY_ADDRESS INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 192.168.6.48:6379... INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at 192.168.6.48:8265 test_counter got 1 test_counter got 2 test_counter got 3 test_counter got 4 test_counter got 5 SUCC cli.py:60 -- ----------------------------------- SUCC cli.py:61 -- Job 'rayjob-sample-2m42f' succeeded SUCC cli.py:62 -- -----------------------------------
由于上述ttlSecondsAfterFinished配置为300s,可以看到300s后 RayCluster自动被释放。
在真实运行场景,可能需要查看RayCluster对应的Dashboard更好地查看任务情况,可以通过下述方式透出:
apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-sample annotations: #此annotation必须要加 否则访问dashboard会报404 nginx.ingress.kubernetes.io/rewrite-target: /$1 spec: entrypoint: python /home/ray/samples/sample_code.py # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. shutdownAfterJobFinishes: true # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. ttlSecondsAfterFinished: 3000 runtimeEnvYAML: | env_vars: counter_name: "test_counter" # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.9.3' # should match the Ray version in the image of the containers # Ray head pod template headGroupSpec: # 开启 ingress enableIngress: true rayStartParams: dashboard-host: '0.0.0.0' template: spec: containers: - name: ray-head image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" requests: cpu: "200m" volumeMounts: - mountPath: /home/ray/samples name: code-sample volumes: - name: code-sample configMap: # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 1 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: spec: containers: - name: ray-worker image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "1" requests: cpu: "200m" --- apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: | import ray import os import requests ray.init() @ray.remote class Counter: def __init__(self): self.name = os.getenv("counter_name") assert self.name == "test_counter" self.counter = 0 def inc(self): self.counter += 1 def get_counter(self): return "{} got {}".format(self.name, self.counter) counter = Counter.remote() for _ in range(5): ray.get(counter.inc.remote()) print(ray.get(counter.get_counter.remote())) assert requests.__version__ == "2.31.0"
可以通过下述方式获取ingress访问的入口endpoint信息:
kubectl get ingress -n +上述yaml部署的namespace名称 kubectl describe ingress ingress名称 -n +上述yaml部署的namespace名称
将上述信息组合即为最终版URL,以上图为例:http://VIP/rayjob-sample-raycluster-c59cq/ 即可访问Dashboard.
在RayJob开发的某些场景下,可能遇到Pod本身挂载路径磁盘不够用的场景,此时需要给RayJob挂载PVC,火山引擎支持下述不同类型的CSI插件:
下述给出一个csi-ebs插件的使用示例:
apiVersion: v1 kind: PersistentVolumeClaim metadata: name: test-pvc spec: storageClassName: ebs-essd accessModes: - ReadWriteOnce resources: requests: storage: 40Gi --- apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-sample spec: entrypoint: python /home/ray/samples/sample_code.py # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. shutdownAfterJobFinishes: true # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. ttlSecondsAfterFinished: 300 runtimeEnvYAML: | env_vars: counter_name: "test_counter" # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.9.3' # should match the Ray version in the image of the containers # Ray head pod template headGroupSpec: rayStartParams: dashboard-host: '0.0.0.0' template: spec: containers: - name: ray-head image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" requests: cpu: "200m" volumeMounts: - mountPath: /home/ray/samples name: code-sample - mountPath: "/var/data/" name: esb volumes: - name: code-sample configMap: # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py - name: esb persistentVolumeClaim: claimName: test-pvc workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 1 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: spec: containers: - name: ray-worker image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "1" requests: cpu: "200m" --- apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: | import ray import os import requests ray.init() @ray.remote class Counter: def __init__(self): self.name = os.getenv("counter_name") assert self.name == "test_counter" self.counter = 0 def inc(self): self.counter += 1 def get_counter(self): return "{} got {}".format(self.name, self.counter) counter = Counter.remote() for _ in range(5): ray.get(counter.inc.remote()) print(ray.get(counter.get_counter.remote())) assert requests.__version__ == "2.31.0"
上述demo中将云盘挂载到/var/data目录,代码可以将部分中间数据写入到上述目录中,供后续RayJob任务使用。
需要注意的是由于VKE默认StorageClass配置的回收策略为Delete,因此上述pvc删除时pv也会一并删除。客户可以通过Airflow编排一系列任务,在工作流末尾清除PVC,达到中间结果复用的目的。另外客户也可以自行修改上述StorageClass的回收策略,使其在PVC清除时PV依旧保留,但是这时客户需要自行管理数据何时清空。
真实业务场景中,通过作业代码都是管理在Git仓库里面的,下面介绍如何通过InitContainer方便拉取Git仓库进行作业测试:
apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-sample annotations: #此annotation必须要加 否则访问dashboard会报404 nginx.ingress.kubernetes.io/rewrite-target: /$1 spec: entrypoint: python /app/apiserver/test/e2e/resources/counter_sample.py # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. shutdownAfterJobFinishes: true # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. ttlSecondsAfterFinished: 3000 runtimeEnvYAML: | env_vars: counter_name: "test_counter" # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.9.3' # should match the Ray version in the image of the containers # Ray head pod template headGroupSpec: # 开启 ingress enableIngress: true rayStartParams: dashboard-host: '0.0.0.0' template: spec: initContainers: - name: git-clone image: alpine/git command: - /bin/sh - -c - | git clone https://github.com/ray-project/kuberay /app volumeMounts: - name: code mountPath: /app containers: - name: ray-head image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" requests: cpu: "200m" volumeMounts: - name: code mountPath: /app volumes: - name: code emptyDir: {} workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 1 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: spec: containers: - name: ray-worker image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "1" requests: cpu: "200m"
针对公司真实场景,通常需要设置拉取git仓库的密钥信息,可以按照下述步骤进行操作:
kubectl create secret generic ssh-key-secret --from-file=ssh-privatekey=id_ras的绝对路径 #上述initContainer处按照下面进行改动: initContainers: - name: git-clone image: alpine/git command: - /bin/sh - -c - | git clone GitHub - ray-project/kuberay: A toolkit to run Ray applications on Kubernetes /app sed -i 's/2.26.0/2.31.0/g' /app/apiserver/test/e2e/resources/counter_sample.py volumeMounts: - name: code mountPath: /app - name: ssh-key-volume mountPath: /root/.ssh #volumes处需要将上述创建secret挂入 volumes: - name: code emptyDir: {} - name: ssh-key-volume secret: secretName: ssh-key-secret
apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-sample spec: entrypoint: python /home/ray/samples/sample_code.py # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. shutdownAfterJobFinishes: true # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. ttlSecondsAfterFinished: 300 # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.9.3' # should match the Ray version in the image of the containers # Ray head pod template enableInTreeAutoscaling: true headGroupSpec: rayStartParams: dashboard-host: '0.0.0.0' template: spec: containers: - name: ray-head image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" nvidia.com/gpu: 1 requests: cpu: "200m" nvidia.com/gpu: 1 volumeMounts: - mountPath: /home/ray/samples name: code-sample volumes: - name: code-sample configMap: # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 1 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: spec: containers: - name: ray-worker image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "1" nvidia.com/gpu: 1 requests: cpu: "200m" nvidia.com/gpu: 1 --- apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: | import ray import os import torch ray.init() print(torch.cuda.is_available())
可以查看日志,看到cuda available is True
INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-drs78-head-svc.test2.svc.cluster.local:8265 SUCC cli.py:60 -- ------------------------------------------------ SUCC cli.py:61 -- Job 'rayjob-sample-rb8g5' submitted successfully SUCC cli.py:62 -- ------------------------------------------------ INFO cli.py:285 -- Next steps INFO cli.py:286 -- Query the logs of the job: INFO cli.py:288 -- ray job logs rayjob-sample-rb8g5 INFO cli.py:290 -- Query the status of the job: INFO cli.py:292 -- ray job status rayjob-sample-rb8g5 INFO cli.py:294 -- Request the job to be stopped: INFO cli.py:296 -- ray job stop rayjob-sample-rb8g5 INFO cli.py:303 -- Tailing logs until the job exits (disable with --no-wait): INFO worker.py:1405 -- Using address 192.168.0.11:6379 set in the environment variable RAY_ADDRESS INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 192.168.0.11:6379... INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at 192.168.0.11:8265 True SUCC cli.py:60 -- ----------------------------------- SUCC cli.py:61 -- Job 'rayjob-sample-rb8g5' succeeded SUCC cli.py:62 -- -----------------------------------
原理介绍可参考:RayCluster开启Autoscaler
apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-sample annotations: #此annotation必须要加 否则访问dashboard会报404 nginx.ingress.kubernetes.io/rewrite-target: /$1 spec: entrypoint: python /home/ray/samples/sample_code.py # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. shutdownAfterJobFinishes: true # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. ttlSecondsAfterFinished: 300 # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.9.3' # should match the Ray version in the image of the containers # If enableInTreeAutoscaling is true, the autoscaler sidecar will be added to the Ray head pod. enableInTreeAutoscaling: true autoscalerOptions: # upscalingMode is "Default" or "Aggressive" or "Conservative" # Conservative: Upscaling is rate-limited; the number of pending worker pods is at most the size of the Ray cluster. # Default: Upscaling is not rate-limited. # Aggressive: An alias for Default; upscaling is not rate-limited. upscalingMode: Conservative # idleTimeoutSeconds is the number of seconds to wait before scaling down a worker pod which is not using Ray resources. idleTimeoutSeconds: 60 # imagePullPolicy optionally overrides the autoscaler container's default image pull policy (IfNotPresent). imagePullPolicy: IfNotPresent # Optionally specify the autoscaler container's securityContext. securityContext: {} env: [] envFrom: [] headGroupSpec: # 开启 ingress enableIngress: true rayStartParams: dashboard-host: '0.0.0.0' template: spec: containers: - name: ray-head image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" requests: cpu: "1" volumeMounts: - mountPath: /home/ray/samples name: code-sample volumes: - name: code-sample configMap: # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 1 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: spec: containers: - name: ray-worker image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "1" requests: cpu: "1" --- apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: | import ray ray.init() import time @ray.remote def square(x): time.sleep(100) return x * x futures = [square.remote(i) for i in range(40)] print(ray.get(futures))
结果如下所示(kubectl logs ray-cluster pod名称 autoscaler -n rayCluster所在namespace):
查看AutoScaler日志可以看到 AutoScaler在资源紧张时发起了伸缩扩容请求。另外当任务跑完后,过了idleTimeoutSeconds时间后,空闲pod将会走缩容流程。