You need to enable JavaScript to run this app.
导航
使用 Volcano Scheduler 替换 K8S Default Scheduler 运行 Spark 及 Ray 作业
最近更新时间:2024.04.24 14:31:44首次发布时间:2024.04.24 14:31:44

EMR On VKE 支持了 K8S 默认调度器运行 Spark 及 Ray 作业。如果您有下述需求,可以考虑使用 volcano scheduler 代替 k8s default scheduler 以获得更好的作业表现:

  1. 多作业之间的公平调度:避免单个任务的 Driver/Exeuctor 占用过多资源导致其他任务饥饿;

  2. 队列能力,区分任务可用 quota 并为不同类型作业设置 min/max 资源限额;

  3. 资源预留:避免大并发作业下全部 Driver 占用所有资源而导致的资源死锁现象;

  4. 批调度:支持批量调度能力。

1 安装 volcano scheduler

方式一:直接使用 yaml 文件安装

kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yam

方式二:使用 helm chart 安装

示例采用源码安装方式,源码参考:https://github.com/volcano-sh/volcano

helm install volcano installer/helm/chart/volcano --namespace volcano-system --create-namespace

# list helm release
helm list -n volcano-system

2 使用 Volcano 调度 Spark 作业

2.1 修改 SparkOperator 配置以支持使用 volcano scheduler

使用 helm 安装目录进行更新

  1. 找到 emr 的 agent pod 并进入。您可以通过下述命令找到 agent:
kubectl get pods -n {您的emr集群ID}

kubectl exec -it {上一步的agent pod名称} /bin/bash -n {您的emr集群ID}

cd /tmp/SPARK/1.0.0/client/charts
  1. (可选)如果您的集群版本 < EMR-VKE-1.4.0,请手动进行下述升级:

    1. 修改 /tmp/SPARK/1.0.0/client/charts/SparkOperator/templates/webhook-cleanup-job.yaml 为如下内容(红色部分为区别):
    {{ if .Values.webhook.enable }}
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: {{ include "spark-operator.fullname" . }}-webhook-cleanup
      annotations:
        {{- toYaml .Values.webhook.cleanupAnnotations | nindent 4 }}
      labels:
        {{- include "spark-operator.labels" . | nindent 4 }}
    spec:
      template:
        metadata:
          name: {{ include "spark-operator.fullname" . }}-webhook-cleanup
          {{- if .Values.istio.enabled }}
          annotations:
            "sidecar.istio.io/inject": "false"
          {{- end }}
          {{- if .Values.webhook.cleanupPodLabels }}
          labels:
            {{- toYaml .Values.webhook.cleanupPodLabels | nindent 8 }}
          {{- end }}
        spec:
          serviceAccountName: {{ include "spark-operator.serviceAccountName" . }}
          restartPolicy: OnFailure
          {{- with .Values.imagePullSecrets }}
          imagePullSecrets:
            {{- toYaml . | nindent 8 }}
          {{- end }}
          containers:
          - name: clean-secret
            image: {{ .Values.image.repository }}:{{ default .Chart.AppVersion .Values.image.tag }}
            imagePullPolicy: {{ .Values.image.pullPolicy }}
            securityContext:
              {{- toYaml .Values.securityContext | nindent 10 }}
            command:
            - "/bin/sh"
            - "-c"
            - "curl -ik \
              -X DELETE \
              -H \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" \
              -H \"Accept: application/json\" \
              -H \"Content-Type: application/json\" \
              https://kubernetes.default.svc/api/v1/namespaces/{{ .Release.Namespace }}/secrets/{{ include "spark-operator.fullname" . }}-webhook-certs \
              && \
              curl -ik \
              -X DELETE \
              -H \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" \
              -H \"Accept: application/json\" \
              -H \"Content-Type: application/json\" \
              --data \"{\\\"kind\\\":\\\"DeleteOptions\\\",\\\"apiVersion\\\":\\\"batch/v1\\\",\\\"propagationPolicy\\\":\\\"Foreground\\\"}\" \
              https://kubernetes.default.svc/apis/batch/v1/namespaces/{{ .Release.Namespace }}/jobs/{{ include "spark-operator.fullname" . }}-webhook-init"
            resources:
              {{- toYaml .Values.webhook.cleanupResources | nindent 10 }}
          {{- with .Values.tolerations }}
          tolerations:
            {{- toYaml . | nindent 8 }}
          {{- end }}
          {{- with .Values.nodeSelector }}
          nodeSelector:
            {{- toYaml . | nindent 8 }}
          {{- end }}
    {{ end }}
    
  2. 执行升级命令

helm upgrade spark-operator ./SparkOperator --namespace {您的emr spark 所在 namespace} --set batchScheduler.enable=true --set webhook.enable=true

2.2 提交 Spark 作业,并指定使用 volcano 调度器调度

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-wordcount
spec:
  type: Scala
  sparkVersion: 3.2.1
  batchScheduler: "volcano"
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "tos://xxx/spark-examples_2.12-3.3.3.jar"
  arguments:
    - "1000"
  driver:
    cores: 1
    coreLimit: 1000m
    memory: 8g
  executor:
    cores: 1
    coreLimit: 1000m
    memory: 4g
    memoryOverhead: 2g
    instances: 1

提交作业后,您可以通过 driver pod event 查看调度情况:

Events:
  Type    Reason     Age   From                  Message
  ----    ------     ----  ----                  -------
  Normal  Scheduled  27s   volcano               Successfully assigned emr-3ia0hq36gf3bd5my2wxh-spark/spark-wordcount-driver to 172.16.2.27

您也可以通过查看 volcano 的 default queue 来查看资源分配情况。

进阶功能

如果您希望将某个作业使用特定队列进行限制及控制,您可以手动创建 volcano queue:

cat << EOF > sparkQ.yaml
apiVersion: scheduling.volcano.sh/v1beta1
kind: Queue
metadata:
  name: sparkqueue
spec:
  weight: 1
  reclaimable: false
  capability:
    cpu: 4
    memory: 16Gi
EOF

kubectl apply -f sparkQ.yaml

volcano 的队列是容纳一组 podgroup 的实体,是资源隔离的基础,也是该组 podgroup 获取集群资源的划分依据。可以向队列中提交 Spark/Ray 作业。修改上述 SparkApplication yaml 创建使用 queue 的作业:

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-wordcount-2
spec:
  type: Scala
  sparkVersion: 3.2.1
  batchScheduler: "volcano"
  batchSchedulerOptions:
    queue: "sparkqueue"
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "tos://{ 您的 bucket }/SparkJobs/spark-examples_2.12-3.3.3.jar"
  arguments:
    - "1000"
  driver:
    cores: 1
    coreLimit: 1000m
    memory: 8g
  executor:
    cores: 1
    coreLimit: 1000m
    memory: 4g
    memoryOverhead: 2g
    instances: 1

可以通过查询 Volcano Queue 的队列使用信息来获取作业调度情况:

kubectl describe queue {上述创建的 queue 名称} -n { 你的命名空间 }
...
Spec:
  Capability:
    Cpu:        5
    Memory:     15Gi
  Reclaimable:  false
  Weight:       4
Status:
  Allocated:
    Cpu:                        2
    Memory:                     15155Mi
    Pods:                       2
    vke.volcengine.com/eni-ip:  2

3 使用 Volcano 调度 Ray 作业

修改 RayOperator 配置以支持使用 volcano scheduler

使用 helm 安装目录进行更新

  1. 找到 emr 的 agent pod 并进入。您可以通过下述命令找到 agent:
kubectl get pods -n {您的emr集群ID}

kubectl exec -it {上一步的agent pod名称} /bin/bash -n {您的emr集群ID}

cd /tmp/RAY/1.0.0/client/charts
  1. (可选)如果您的集群版本 < EMR-VKE-1.4.0,请手动进行下述升级:

    1. 修改/tmp/RAY/1.0.0/client/charts/kuberay-operator/templates/rolebinding.yaml 为如下内容(红色部分为区别):
    {{- if .Values.rbacEnable }}
    kind: ClusterRoleBinding
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
      labels:
    {{ include "kuberay-operator.labels" . | indent 4 }}
      name: {{ include "kuberay-operator.fullname" . }}
    subjects:
    - kind: ServiceAccount
      name: {{ .Values.serviceAccount.name  }}
      namespace: {{ .Release.Namespace }}
    roleRef:
      kind: ClusterRole
      name: {{ include "kuberay-operator.fullname" . }}
      apiGroup: rbac.authorization.k8s.io
    {{- end }}
    
    1. 修改/tmp/RAY/1.0.0/client/charts/kuberay-operator/templates/role.yaml 为如下内容:
    {{- if .Values.rbacEnable }}
    kind: ClusterRole
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
      labels:
    {{ include "kuberay-operator.labels" . | indent 4 }}
      name: {{ include "kuberay-operator.fullname" . }}
    rules:
    - apiGroups:
      - batch
      resources:
      - jobs
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - coordination.k8s.io
      resources:
      - leases
      verbs:
      - create
      - get
      - list
      - update
    - apiGroups:
      - ""
      resources:
      - events
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - ""
      resources:
      - pods
      verbs:
      - create
      - delete
      - deletecollection
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - ""
      resources:
      - pods/status
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - ""
      resources:
      - serviceaccounts
      verbs:
      - create
      - delete
      - get
      - list
      - watch
    - apiGroups:
      - ""
      resources:
      - services
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - ""
      resources:
      - services/status
      verbs:
      - get
      - patch
      - update
    - apiGroups:
      - extensions
      resources:
      - ingresses
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - networking.k8s.io
      resources:
      - ingressclasses
      verbs:
      - get
      - list
      - watch
    - apiGroups:
      - networking.k8s.io
      resources:
      - ingresses
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - ray.io
      resources:
      - rayclusters
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - ray.io
      resources:
      - rayclusters/finalizers
      verbs:
      - update
    - apiGroups:
      - ray.io
      resources:
      - rayclusters/status
      verbs:
      - get
      - patch
      - update
    - apiGroups:
      - ray.io
      resources:
      - rayjobs
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - ray.io
      resources:
      - rayjobs/finalizers
      verbs:
      - update
    - apiGroups:
      - ray.io
      resources:
      - rayjobs/status
      verbs:
      - get
      - patch
      - update
    - apiGroups:
      - ray.io
      resources:
      - rayservices
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    - apiGroups:
      - ray.io
      resources:
      - rayservices/finalizers
      verbs:
      - update
    - apiGroups:
      - ray.io
      resources:
      - rayservices/status
      verbs:
      - get
      - patch
      - update
    - apiGroups:
      - rbac.authorization.k8s.io
      resources:
      - rolebindings
      verbs:
      - create
      - delete
      - get
      - list
      - watch
    - apiGroups:
      - rbac.authorization.k8s.io
      resources:
      - roles
      verbs:
      - create
      - delete
      - get
      - list
      - update
      - watch
    - apiGroups:
      - route.openshift.io
      resources:
      - routes
      verbs:
      - create
      - delete
      - get
      - list
      - patch
      - update
      - watch
    {{- if .Values.batchScheduler.enabled }}
    - apiGroups:
      - scheduling.volcano.sh
      resources:
      - podgroups
      verbs:
      - create
      - delete
      - get
      - list
      - update
      - watch
    - apiGroups:
      - apiextensions.k8s.io
      resources:
      - customresourcedefinitions
      verbs:
      - get
    {{- end }}
    {{- end }}
    
  2. 执行升级命令,允许 ray 使用 batchScheduler

helm upgrade kuberay-operator ./kuberay-operator --namespace {您 rayOperator 部署的 namespace} --set batchScheduler.enabled=true
  1. 创建一个队列:
cat << EOF > sparkQ.yaml
apiVersion: scheduling.volcano.sh/v1beta1
kind: Queue
metadata:
  name: rayqueue
spec:
  weight: 1
  reclaimable: false
  capability:
    cpu: 5
    memory: 10Gi
EOF

kubectl apply -f sparkQ.yaml
  1. 创建 RayCluster,并使用 volcano 调度
apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: ray-volcano-cluster
  labels:
    ray.io/scheduler-name: volcano
    volcano.sh/queue-name: rayqueue
spec:
  rayVersion: '2.7.0'
  headGroupSpec:
    rayStartParams: {}
    template:
      spec:
        containers:
          - name: ray-head
            image: 您的 ray 镜像地址
            resources:
              limits:
                cpu: "1"
                memory: "2Gi"
              requests:
                cpu: "1"
                memory: "2Gi"
  workerGroupSpecs:
    - groupName: worker
      rayStartParams: {}
      replicas: 2
      minReplicas: 2
      maxReplicas: 2
      template:
        spec:
          containers:
            - name: ray-head
              image: 您的 ray 镜像地址
              resources:
                limits:
                  cpu: "1"
                  memory: "1Gi"
                requests:
                  cpu: "1"
                  memory: "1Gi"
  1. 创建 ray job,并使用 volcano 调度
apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: rayjob-sample-0702
  labels:
    ray.io/scheduler-name: volcano
    volcano.sh/queue-name: rayqueue
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /$1 
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
  shutdownAfterJobFinishes: true

  # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
  ttlSecondsAfterFinished: 3000

  runtimeEnvYAML: |
    env_vars:
      counter_name: "test_counter"
      
  # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
  rayClusterSpec:
    rayVersion: '2.9.3' # should match the Ray version in the image of the containers
    # Ray head pod template
    headGroupSpec:
      # 开启 ingress
      enableIngress: true
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          containers:
            - name: ray-head
              image: 您的 ray 镜像地址
              ports:  
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "200m"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            - name: code-sample
              configMap:
                # Provide the name of the ConfigMap you want to mount.
                name: ray-job-code-sample
                # An array of keys from the ConfigMap to create as files
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      # the pod replicas in this group typed worker
      - replicas: 1
        minReplicas: 1
        maxReplicas: 5
        # logical group name, for this called small-group, also can be functional
        groupName: small-group
        # The `rayStartParams` are used to configure the `ray start` command.
        # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
        # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
        rayStartParams: {}
        #pod template
        template:
          spec:
            containers:
              - name: ray-worker
                image: 您的 ray 镜像地址
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "200m"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    assert requests.__version__ == "2.31.0"
  1. 可以通过队列或容器信息查看 volcano 调度情况:

RayHead:

Events:
  Type    Reason     Age   From                  Message
  ----    ------     ----  ----                  -------
  Normal  Scheduled  36s   volcano               Successfully assigned emr-3ia0hq36e35exzu87met-ray/ray-volcano-cluster-head-74v9n to 172.16.2.25

Queue:

Spec:
  Capability:
    Cpu:        5
    Memory:     15Gi
  Reclaimable:  false
  Weight:       4
Status:
  Allocated:
    Cpu:                        3
    Memory:                     4Gi
    Pods:                       3
    vke.volcengine.com/eni-ip:  3