在部分场景下,ray/spark 作业可能需要消耗比较大规格的资源来运行作业。从资源调度角度来看,当您没有配置诸如(反)亲和性等设置时,k8s 默认调度器会默认采用最小化Pod数量、平衡资源使用、优先使用最空闲的节点等策略保证调度的公平性。这种基于公平的调度可能会导致一些场景下出现资源碎片的问题,即全集群总资源足够,但是由于 pod 的 placement 不够合理而导致大作业饥饿。一个典型场景示例如下图所示:
集群当前资源使用及 pod 分布如下:
此时如果需要调度 2 个 6C 24Gi 的pod,则由于内存余量原因,只有节点 2 会成功调度到 1 个 pod 而剩余 1 个 pod 处于 Pending 状态。但是从资源总量来看,所有 pod 应该是可以调度成功的:
总量:24C 96Gi 需要:2C8G * 2 + 1C4Gi*3 + 6C24Gi * 2 = 19C 76Gi
在这类场景下,我们需要通过优先级等能力来动态调整资源分配以便能避免类似的资源碎片问题。
在默认情况下,EMR 集群及其他 Pod 的优先级为 0。我们可以对高优先级、资源消耗较大的 Pod 设置高优先级,以便 K8S 调度器可以通过驱逐等方式满足所有 Pod 的部署要求。
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: high-priority
value: 1000000
globalDefault: false
description: "This priority class should be used for high priority pods."
使用 kubectl apply 命令提交上述 yaml 创建。
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-priority
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /$1
testcase: priority
spec:
entrypoint: python /home/ray/samples/sample_code.py
# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
shutdownAfterJobFinishes: true
# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
ttlSecondsAfterFinished: 3000
runtimeEnvYAML: |
env_vars:
counter_name: "test_counter"
# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.9.3' # should match the Ray version in the image of the containers
# Ray head pod template
headGroupSpec:
# 开启 ingress
enableIngress: true
rayStartParams:
dashboard-host: '0.0.0.0'
template:
metadata:
annotations:
testcase: priority
spec:
priorityClassName: high-priority
containers:
- name: ray-head
image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265 # Ray dashboard
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
memory: "4Gi"
requests:
cpu: "1"
memory: "4Gi"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
- name: code-sample
configMap:
# Provide the name of the ConfigMap you want to mount.
name: ray-job-code-sample
# An array of keys from the ConfigMap to create as files
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 2
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: small-group
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
metadata:
annotations:
testcase: priority
spec:
priorityClassName: high-priority
containers:
- name: ray-worker
image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
cpu: "5"
memory: "20Gi"
requests:
cpu: "5"
memory: "20Gi"
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests
import time
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0
def inc(self):
self.counter += 1
def get_counter(self):
return "{} got {}".format(self.name, self.counter)
counter = Counter.remote()
for _ in range(5):
ray.get(counter.inc.remote())
time.sleep(5)
print(ray.get(counter.get_counter.remote()))
assert requests.__version__ == "2.31.0"
通过指定 priorityClassName: high-priority,当资源不足时 k8s 将按照 pod 优先级进行驱逐及重新尝试调度,以满足 pod 的资源使用诉求。
Spark operator 支持使用 driver/executor template 为 pod 设置优先级,您可以通过 EMR 控制台修改对应配置项以达到上述能力:
之后您正常提交 SparkApplication 对应 Pod 内就会有相应的 PriorityClass。
注意
当使用 PriorityClass 后,在资源不足的情况下可能对低优先级 Pod 进行抢占。请确保其他被驱逐的 Pod 不会受重启等影响,以防止对您的业务造成损失。您可以对禁止驱逐的 Pod 设置相同或更高的优先级以避免类似影响。