# 如需使用自定义镜像,添加该配置,替换镜像地址 # set serverless.ray.image = image-repo-cn-beijing.cr.volces.com/emr-serverless-ray/ray:2.9.0-patch; # 如使用自定义镜像,ray 版本需要与镜像版本一致 # set serverless.ray.version = 2.9.0; set serverless.ray.head.cpu = 1; set serverless.ray.head.memory = 4Gi; set serverless.ray.worker.cpu = 1; set serverless.ray.worker.memory = 4Gi; set serverless.ray.worker.replicas = 1; # entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名} # 其中 /home/ray/workdir 目前为固定值 set serverless.ray.entrypoint.cmd = python /home/ray/workdir/cray_demos/svpc/tlest_redisp.py; # tos 文件路径 set serverless.ray.entrypoint.bundle.path = tos://wbwemr-devqa/klehui/demo/cray/ray_demossvpc.zip; set serverless.ray.runtime.env.json = {"pip":["redis"],"env_vars":{"counter_name":"test_counter"}};
说明
命令中tos://wbwemr-devqa/klehui/demo/cray/ray_demossvpc.zip
用户需替换成实际TOS文件路径。
区域 | python版本 | 镜像 | 其他说明 |
---|---|---|---|
华东 | 3.9 | emr-vke-qa-cn-beijing.cr.volces.com/emr/ray:2.22.0-py3.9-ubuntu20.04-178 |
|
说明
Ray基础镜像请参考:Ray镜像列表。
$ pip3 install python_serverless-1.0.1-py3-none-any.whl
SDK安装包:
import time from serverless import StaticCredentials from serverless import ServerlessClient from serverless import RayJobTask from serverless import JobStatus from serverless.exceptions import QuerySdkError ak = '<此处填写用户实际的 AK>' sk = '<此处填写用户实际的 SK>' def init(): credentials = StaticCredentials(ak, sk) # 初始化client return ServerlessClient(credentials, endpoint='open.volcengineapi.com', service='emr_serverless', region='cn-beijing', connection_timeout=30, socket_timeout=30) def submit(): client = init() # 提交作业 job = client.execute(task=RayJobTask(name="test rayjob task", conf={ # 如需使用自定义镜像,添加该配置,替换镜像地址 # "serverless.ray.version":"2.22.0", #"serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:2.30.0-py3.11-ubuntu20.04-240" }, # 默认4c/16g规格 head_cpu='64', head_memory='256Gi', worker_cpu='64', worker_memory='256Gi', worker_replicas=4, # entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名} # 其中 /home/ray/workdir 目前为固定值 entrypoint_cmd='python /home/ray/workdir/ray_demo/sleep.py', # tos 文件路径 entrypoint_resource='tos://emr-qa/lei/demo/ray/ray_demo.zip', runtime_env={ # 需要runtime引入的pip包 "pip": ["s3fs"], "env_vars": { "counter_name": "test_counter" } } # queue=xxx ), is_sync=False) # 获取执行UI链接 # time.sleep(20) # print('RayJob UI: %s' % job.get_tracking_url()) while not JobStatus.is_finished(job.status): job = client.get_job(job.id) print('Id: %s, Status: %s' % (job.id, job.status)) try: print('Tracking ui: %s' % job.get_tracking_url()) except QuerySdkError: pass time.sleep(3) print('The task executed successfully!!!') print('Tracking ui: %s' % job.get_tracking_url()) print('Result ui: %s' % job.get_result_url()) # 查看执行日志 log_cursor = client.get_submission_log(job) while log_cursor.has_next(): log_cursor.fetch_next_page() current_rows = log_cursor.current_rows for log_entry in current_rows: print(log_entry) def get_ray_ui(): client = init() job = client.get_job(280218916) print('start_time: %s' % job.__getattribute__("start_time")) print('end_time: %s' % job.__getattribute__("end_time")) print('status: %s' % job.__getattribute__("status")) print('queue_name: %s' % job.__getattribute__("queue_name")) print('conf: %s' % job.__getattribute__("conf")) print('tracking_url: %s' % job.get_tracking_url()) def cancel_job(job_id: str): client = init() client.cancel_job(client.get_job(job_id)) def query_log(job_id: str): client = init() job = client.get_job(job_id) log_cursor = client.get_driver_log(job) while log_cursor.has_next(): log_cursor.fetch_next_page() current_rows = log_cursor.current_rows for log_entry in current_rows: print(log_entry) if __name__ == '__main__': submit() #query_log('280232151') #cancel_job('280233741')
说明
entrypoint_resource='tos://emr-qa/lei/demo/ray/ray_demo.zip‘
为上传ray_demo.zip包到tos的路径。