You need to enable JavaScript to run this app.
导航
Serverless Ray 开发指南
最近更新时间:2024.10.11 20:33:38首次发布时间:2024.10.11 20:30:27

Ray作业管理

可视化提交

  • 提交作业
    EMR Serverless Spark控制台>作业管理>创建作业中,单击RayJob,在创建作业页面编写作业并提交。
  • 作业demo
# 如需使用自定义镜像,添加该配置,替换镜像地址
# set serverless.ray.image = image-repo-cn-beijing.cr.volces.com/emr-serverless-ray/ray:2.9.0-patch;
# 如使用自定义镜像,ray 版本需要与镜像版本一致
# set serverless.ray.version = 2.9.0;

set serverless.ray.head.cpu = 1;
set serverless.ray.head.memory = 4Gi;
set serverless.ray.worker.cpu = 1;
set serverless.ray.worker.memory = 4Gi;
set serverless.ray.worker.replicas = 1;
# entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名}
# 其中 /home/ray/workdir 目前为固定值
set serverless.ray.entrypoint.cmd = python /home/ray/workdir/crossvpc/test_redis.py;
# tos 文件路径
set serverless.ray.entrypoint.bundle.path = tos://wbw-dev/kehu/demo/crossvpc.zip;
set serverless.ray.runtime.env.json = {"pip":["redis"],"env_vars":{"counter_name":"test_counter"}};

自定义镜像

区域

python版本

镜像

其他说明

华东

3.9

emr-vke-qa-cn-beijing.cr.volces.com/emr/ray:2.22.0-py3.9-ubuntu20.04-178

  • ray:2.22.0
  • 操作系统:Ubuntu 20.04

说明

Ray基础镜像请参考:Ray镜像列表

PythonSDK使用文档

SDK 安装

$ pip3 install python_serverless-1.0.1-py3-none-any.whl

说明

python_serverless-1.0.1-py3-none-any.whl为安装包名称,安装包获取方式请联系火山工作人员。

创建RayJob任务

  • 提交任务
import time

from serverless import StaticCredentials
from serverless import ServerlessClient
from serverless import RayJobTask
from serverless import JobStatus
from serverless.exceptions import QuerySdkError

ak = 'xx'
sk = 'xx'


def init():
    credentials = StaticCredentials(ak, sk)

    # 初始化client
    return ServerlessClient(credentials,
                            endpoint='open.volcengineapi.com',
                            service='emr_serverless',
                            region='cn-beijing',
                            connection_timeout=30,
                            socket_timeout=30)


def submit():
    client = init()
    # 提交作业
    job = client.execute(task=RayJobTask(name="test rayjob task",
                                         conf={
                                             # 如需使用自定义镜像,添加该配置,替换镜像地址
                                             # "serverless.ray.version":"2.22.0",
                                             #"serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:2.30.0-py3.11-ubuntu20.04-240"
                                         },
                                         # 默认4c/16g规格
                                         head_cpu='64',
                                         head_memory='256Gi',
                                         worker_cpu='64',
                                         worker_memory='256Gi',
                                         worker_replicas=4,
                                         # entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名}
                                         # 其中 /home/ray/workdir 目前为固定值
                                         entrypoint_cmd='python /home/ray/workdir/ray_demo/sleep.py',
                                         # tos 文件路径
                                         entrypoint_resource='tos://emr-qa/lei/demo/ray/ray_demo.zip',
                                         runtime_env={
                                             # 需要runtime引入的pip包
                                             "pip": ["s3fs"],
                                             "env_vars": {
                                                 "counter_name": "test_counter"
                                             }
                                         }
                                         # queue=xxx
                                         ),
                         is_sync=False)

    # 获取执行UI链接
    # time.sleep(20)
    # print('RayJob UI: %s' % job.get_tracking_url())

    while not JobStatus.is_finished(job.status):
        job = client.get_job(job.id)
        print('Id: %s, Status: %s' % (job.id, job.status))
        try:
            print('Tracking ui: %s' % job.get_tracking_url())
        except QuerySdkError:
            pass
        time.sleep(3)

    print('The task executed successfully!!!')
    print('Tracking ui: %s' % job.get_tracking_url())
    print('Result ui: %s' % job.get_result_url())

    # 查看执行日志
    log_cursor = client.get_submission_log(job)
    while log_cursor.has_next():
        log_cursor.fetch_next_page()
        current_rows = log_cursor.current_rows
        for log_entry in current_rows:
            print(log_entry)


def get_ray_ui():
    client = init()
    job = client.get_job(280218916)
    print('start_time: %s' % job.__getattribute__("start_time"))
    print('end_time: %s' % job.__getattribute__("end_time"))
    print('status: %s' % job.__getattribute__("status"))
    print('queue_name: %s' % job.__getattribute__("queue_name"))
    print('conf: %s' % job.__getattribute__("conf"))
    print('tracking_url: %s' % job.get_tracking_url())


def cancel_job(job_id: str):
    client = init()
    client.cancel_job(client.get_job(job_id))


def query_log(job_id: str):
    client = init()
    job = client.get_job(job_id)
    log_cursor = client.get_driver_log(job)
    while log_cursor.has_next():
        log_cursor.fetch_next_page()
        current_rows = log_cursor.current_rows
        for log_entry in current_rows:
            print(log_entry)


if __name__ == '__main__':
     submit()
    #query_log('280232151')
    #cancel_job('280233741')