在部分场景下,ray/spark 作业可能需要消耗比较大规格的资源来运行作业。从资源调度角度来看,当您没有配置诸如(反)亲和性等设置时,k8s 默认调度器会默认采用最小化Pod数量、平衡资源使用、优先使用最空闲的节点等策略保证调度的公平性。这种基于公平的调度可能会导致一些场景下出现资源碎片的问题,即全集群总资源足够,但是由于 pod 的 placement 不够合理而导致大作业饥饿。一个典型场景示例如下图所示:
集群当前资源使用及 pod 分布如下:
此时如果需要调度 2 个 6C 24Gi 的pod,则由于内存余量原因,只有节点 2 会成功调度到 1 个 pod 而剩余 1 个 pod 处于 Pending 状态。但是从资源总量来看,所有 pod 应该是可以调度成功的:
总量:24C 96Gi 需要:2C8G * 2 + 1C4Gi*3 + 6C24Gi * 2 = 19C 76Gi
在默认情况下,EMR 集群及其他 Pod 的优先级为 0。我们可以对高优先级、资源消耗较大的 Pod 设置高优先级,以便 K8S 调度器可以通过驱逐等方式满足所有 Pod 的部署要求。
apiVersion: scheduling.k8s.io/v1 kind: PriorityClass metadata: name: high-priority value: 1000000 globalDefault: false description: "This priority class should be used for high priority pods."
使用 kubectl apply 命令提交上述 yaml 创建。
apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-priority annotations: nginx.ingress.kubernetes.io/rewrite-target: /$1 testcase: priority spec: entrypoint: python /home/ray/samples/sample_code.py # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. shutdownAfterJobFinishes: true # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. ttlSecondsAfterFinished: 3000 runtimeEnvYAML: | env_vars: counter_name: "test_counter" # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.9.3' # should match the Ray version in the image of the containers # Ray head pod template headGroupSpec: # 开启 ingress enableIngress: true rayStartParams: dashboard-host: '' template: metadata: annotations: testcase: priority spec: priorityClassName: high-priority containers: - name: ray-head image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" memory: "4Gi" requests: cpu: "1" memory: "4Gi" volumeMounts: - mountPath: /home/ray/samples name: code-sample volumes: - name: code-sample configMap: # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 2 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: metadata: annotations: testcase: priority spec: priorityClassName: high-priority containers: - name: ray-worker image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.9.3-py3.9-ubuntu20.04-20240402 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "5" memory: "20Gi" requests: cpu: "5" memory: "20Gi" --- apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: | import ray import os import requests import time ray.init() @ray.remote class Counter: def __init__(self): self.name = os.getenv("counter_name") assert self.name == "test_counter" self.counter = 0 def inc(self): self.counter += 1 def get_counter(self): return "{} got {}".format(self.name, self.counter) counter = Counter.remote() for _ in range(5): ray.get(counter.inc.remote()) time.sleep(5) print(ray.get(counter.get_counter.remote())) assert requests.__version__ == "2.31.0"
通过指定 priorityClassName: high-priority,当资源不足时 k8s 将按照 pod 优先级进行驱逐及重新尝试调度,以满足 pod 的资源使用诉求。
Spark operator 支持使用 driver/executor template 为 pod 设置优先级,您可以通过 EMR 控制台修改对应配置项以达到上述能力:
之后您正常提交 SparkApplication 对应 Pod 内就会有相应的 PriorityClass。
当使用 PriorityClass 后,在资源不足的情况下可能对低优先级 Pod 进行抢占。请确保其他被驱逐的 Pod 不会受重启等影响,以防止对您的业务造成损失。您可以对禁止驱逐的 Pod 设置相同或更高的优先级以避免类似影响。