EMR On VKE 支持了 K8S 默认调度器运行 Spark 及 Ray 作业。如果您有下述需求,可以考虑使用 volcano scheduler 代替 k8s default scheduler 以获得更好的作业表现:
多作业之间的公平调度:避免单个任务的 Driver/Exeuctor 占用过多资源导致其他任务饥饿;
队列能力,区分任务可用 quota 并为不同类型作业设置 min/max 资源限额;
资源预留:避免大并发作业下全部 Driver 占用所有资源而导致的资源死锁现象;
批调度:支持批量调度能力。
kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yam
示例采用源码安装方式,源码参考:https://github.com/volcano-sh/volcano
helm install volcano installer/helm/chart/volcano --namespace volcano-system --create-namespace # list helm release helm list -n volcano-system
使用 helm 安装目录进行更新
kubectl get pods -n {您的emr集群ID} kubectl exec -it {上一步的agent pod名称} /bin/bash -n {您的emr集群ID} cd /tmp/SPARK/1.0.0/client/charts
(可选)如果您的集群版本 < EMR-VKE-1.4.0,请手动进行下述升级:
{{ if .Values.webhook.enable }} apiVersion: batch/v1 kind: Job metadata: name: {{ include "spark-operator.fullname" . }}-webhook-cleanup annotations: {{- toYaml .Values.webhook.cleanupAnnotations | nindent 4 }} labels: {{- include "spark-operator.labels" . | nindent 4 }} spec: template: metadata: name: {{ include "spark-operator.fullname" . }}-webhook-cleanup {{- if .Values.istio.enabled }} annotations: "sidecar.istio.io/inject": "false" {{- end }} {{- if .Values.webhook.cleanupPodLabels }} labels: {{- toYaml .Values.webhook.cleanupPodLabels | nindent 8 }} {{- end }} spec: serviceAccountName: {{ include "spark-operator.serviceAccountName" . }} restartPolicy: OnFailure {{- with .Values.imagePullSecrets }} imagePullSecrets: {{- toYaml . | nindent 8 }} {{- end }} containers: - name: clean-secret image: {{ .Values.image.repository }}:{{ default .Chart.AppVersion .Values.image.tag }} imagePullPolicy: {{ .Values.image.pullPolicy }} securityContext: {{- toYaml .Values.securityContext | nindent 10 }} command: - "/bin/sh" - "-c" - "curl -ik \ -X DELETE \ -H \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" \ -H \"Accept: application/json\" \ -H \"Content-Type: application/json\" \ https://kubernetes.default.svc/api/v1/namespaces/{{ .Release.Namespace }}/secrets/{{ include "spark-operator.fullname" . }}-webhook-certs \ && \ curl -ik \ -X DELETE \ -H \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" \ -H \"Accept: application/json\" \ -H \"Content-Type: application/json\" \ --data \"{\\\"kind\\\":\\\"DeleteOptions\\\",\\\"apiVersion\\\":\\\"batch/v1\\\",\\\"propagationPolicy\\\":\\\"Foreground\\\"}\" \ https://kubernetes.default.svc/apis/batch/v1/namespaces/{{ .Release.Namespace }}/jobs/{{ include "spark-operator.fullname" . }}-webhook-init" resources: {{- toYaml .Values.webhook.cleanupResources | nindent 10 }} {{- with .Values.tolerations }} tolerations: {{- toYaml . | nindent 8 }} {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} {{- end }} {{ end }}
执行升级命令
helm upgrade spark-operator ./SparkOperator --namespace {您的emr spark 所在 namespace} --set batchScheduler.enable=true --set webhook.enable=true
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-wordcount spec: type: Scala sparkVersion: 3.2.1 batchScheduler: "volcano" mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "tos://xxx/spark-examples_2.12-3.3.3.jar" arguments: - "1000" driver: cores: 1 coreLimit: 1000m memory: 8g executor: cores: 1 coreLimit: 1000m memory: 4g memoryOverhead: 2g instances: 1
提交作业后,您可以通过 driver pod event 查看调度情况:
Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal Scheduled 27s volcano Successfully assigned emr-3ia0hq36gf3bd5my2wxh-spark/spark-wordcount-driver to 172.16.2.27
您也可以通过查看 volcano 的 default queue 来查看资源分配情况。
如果您希望将某个作业使用特定队列进行限制及控制,您可以手动创建 volcano queue:
cat << EOF > sparkQ.yaml apiVersion: scheduling.volcano.sh/v1beta1 kind: Queue metadata: name: sparkqueue spec: weight: 1 reclaimable: false capability: cpu: 4 memory: 16Gi EOF kubectl apply -f sparkQ.yaml
volcano 的队列是容纳一组 podgroup 的实体,是资源隔离的基础,也是该组 podgroup 获取集群资源的划分依据。可以向队列中提交 Spark/Ray 作业。修改上述 SparkApplication yaml 创建使用 queue 的作业:
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-wordcount-2 spec: type: Scala sparkVersion: 3.2.1 batchScheduler: "volcano" batchSchedulerOptions: queue: "sparkqueue" mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "tos://{ 您的 bucket }/SparkJobs/spark-examples_2.12-3.3.3.jar" arguments: - "1000" driver: cores: 1 coreLimit: 1000m memory: 8g executor: cores: 1 coreLimit: 1000m memory: 4g memoryOverhead: 2g instances: 1
可以通过查询 Volcano Queue 的队列使用信息来获取作业调度情况:
kubectl describe queue {上述创建的 queue 名称} -n { 你的命名空间 } ... Spec: Capability: Cpu: 5 Memory: 15Gi Reclaimable: false Weight: 4 Status: Allocated: Cpu: 2 Memory: 15155Mi Pods: 2 vke.volcengine.com/eni-ip: 2
使用 helm 安装目录进行更新
kubectl get pods -n {您的emr集群ID} kubectl exec -it {上一步的agent pod名称} /bin/bash -n {您的emr集群ID} cd /tmp/RAY/1.0.0/client/charts
(可选)如果您的集群版本 < EMR-VKE-1.4.0,请手动进行下述升级:
{{- if .Values.rbacEnable }} kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: labels: {{ include "kuberay-operator.labels" . | indent 4 }} name: {{ include "kuberay-operator.fullname" . }} subjects: - kind: ServiceAccount name: {{ .Values.serviceAccount.name }} namespace: {{ .Release.Namespace }} roleRef: kind: ClusterRole name: {{ include "kuberay-operator.fullname" . }} apiGroup: rbac.authorization.k8s.io {{- end }}
{{- if .Values.rbacEnable }} kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: labels: {{ include "kuberay-operator.labels" . | indent 4 }} name: {{ include "kuberay-operator.fullname" . }} rules: - apiGroups: - batch resources: - jobs verbs: - create - delete - get - list - patch - update - watch - apiGroups: - coordination.k8s.io resources: - leases verbs: - create - get - list - update - apiGroups: - "" resources: - events verbs: - create - delete - get - list - patch - update - watch - apiGroups: - "" resources: - pods verbs: - create - delete - deletecollection - get - list - patch - update - watch - apiGroups: - "" resources: - pods/status verbs: - create - delete - get - list - patch - update - watch - apiGroups: - "" resources: - serviceaccounts verbs: - create - delete - get - list - watch - apiGroups: - "" resources: - services verbs: - create - delete - get - list - patch - update - watch - apiGroups: - "" resources: - services/status verbs: - get - patch - update - apiGroups: - extensions resources: - ingresses verbs: - create - delete - get - list - patch - update - watch - apiGroups: - networking.k8s.io resources: - ingressclasses verbs: - get - list - watch - apiGroups: - networking.k8s.io resources: - ingresses verbs: - create - delete - get - list - patch - update - watch - apiGroups: - ray.io resources: - rayclusters verbs: - create - delete - get - list - patch - update - watch - apiGroups: - ray.io resources: - rayclusters/finalizers verbs: - update - apiGroups: - ray.io resources: - rayclusters/status verbs: - get - patch - update - apiGroups: - ray.io resources: - rayjobs verbs: - create - delete - get - list - patch - update - watch - apiGroups: - ray.io resources: - rayjobs/finalizers verbs: - update - apiGroups: - ray.io resources: - rayjobs/status verbs: - get - patch - update - apiGroups: - ray.io resources: - rayservices verbs: - create - delete - get - list - patch - update - watch - apiGroups: - ray.io resources: - rayservices/finalizers verbs: - update - apiGroups: - ray.io resources: - rayservices/status verbs: - get - patch - update - apiGroups: - rbac.authorization.k8s.io resources: - rolebindings verbs: - create - delete - get - list - watch - apiGroups: - rbac.authorization.k8s.io resources: - roles verbs: - create - delete - get - list - update - watch - apiGroups: - route.openshift.io resources: - routes verbs: - create - delete - get - list - patch - update - watch {{- if .Values.batchScheduler.enabled }} - apiGroups: - scheduling.volcano.sh resources: - podgroups verbs: - create - delete - get - list - update - watch - apiGroups: - apiextensions.k8s.io resources: - customresourcedefinitions verbs: - get {{- end }} {{- end }}
执行升级命令,允许 ray 使用 batchScheduler
helm upgrade kuberay-operator ./kuberay-operator --namespace {您 rayOperator 部署的 namespace} --set batchScheduler.enabled=true
cat << EOF > sparkQ.yaml apiVersion: scheduling.volcano.sh/v1beta1 kind: Queue metadata: name: rayqueue spec: weight: 1 reclaimable: false capability: cpu: 5 memory: 10Gi EOF kubectl apply -f sparkQ.yaml
apiVersion: ray.io/v1 kind: RayCluster metadata: name: ray-volcano-cluster labels: ray.io/scheduler-name: volcano volcano.sh/queue-name: rayqueue spec: rayVersion: '2.7.0' headGroupSpec: rayStartParams: {} template: spec: containers: - name: ray-head image: 您的 ray 镜像地址 resources: limits: cpu: "1" memory: "2Gi" requests: cpu: "1" memory: "2Gi" workerGroupSpecs: - groupName: worker rayStartParams: {} replicas: 2 minReplicas: 2 maxReplicas: 2 template: spec: containers: - name: ray-head image: 您的 ray 镜像地址 resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi"
apiVersion: ray.io/v1 kind: RayJob metadata: name: rayjob-sample-0702 labels: ray.io/scheduler-name: volcano volcano.sh/queue-name: rayqueue annotations: nginx.ingress.kubernetes.io/rewrite-target: /$1 spec: entrypoint: python /home/ray/samples/sample_code.py # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. shutdownAfterJobFinishes: true # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. ttlSecondsAfterFinished: 3000 runtimeEnvYAML: | env_vars: counter_name: "test_counter" # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. rayClusterSpec: rayVersion: '2.9.3' # should match the Ray version in the image of the containers # Ray head pod template headGroupSpec: # 开启 ingress enableIngress: true rayStartParams: dashboard-host: '0.0.0.0' template: spec: containers: - name: ray-head image: 您的 ray 镜像地址 ports: - containerPort: 6379 name: gcs-server - containerPort: 8265 # Ray dashboard name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" requests: cpu: "200m" volumeMounts: - mountPath: /home/ray/samples name: code-sample volumes: - name: code-sample configMap: # Provide the name of the ConfigMap you want to mount. name: ray-job-code-sample # An array of keys from the ConfigMap to create as files items: - key: sample_code.py path: sample_code.py workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 minReplicas: 1 maxReplicas: 5 # logical group name, for this called small-group, also can be functional groupName: small-group # The `rayStartParams` are used to configure the `ray start` command. # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. rayStartParams: {} #pod template template: spec: containers: - name: ray-worker image: 您的 ray 镜像地址 lifecycle: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] resources: limits: cpu: "1" requests: cpu: "200m" --- apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: | import ray import os import requests ray.init() @ray.remote class Counter: def __init__(self): self.name = os.getenv("counter_name") assert self.name == "test_counter" self.counter = 0 def inc(self): self.counter += 1 def get_counter(self): return "{} got {}".format(self.name, self.counter) counter = Counter.remote() for _ in range(5): ray.get(counter.inc.remote()) print(ray.get(counter.get_counter.remote())) assert requests.__version__ == "2.31.0"
RayHead:
Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal Scheduled 36s volcano Successfully assigned emr-3ia0hq36e35exzu87met-ray/ray-volcano-cluster-head-74v9n to 172.16.2.25
Queue:
Spec: Capability: Cpu: 5 Memory: 15Gi Reclaimable: false Weight: 4 Status: Allocated: Cpu: 3 Memory: 4Gi Pods: 3 vke.volcengine.com/eni-ip: 3