Ray 是一个开源的分布式计算框架,旨在简化在多种计算资源上运行分布式应用程序的过程。Ray提供了一套简单直观的 API,使得开发者能够轻松地构建、部署和管理大规模的分布式应用程序。基于此分布式计算框架,Ray 提供了诸多面向数据科学和人工智能的上层库。
Ray 的分布式计算底盘,提供了 low level 的编程 API,可以让用户方便的把单机程序转为分布式计算任务。
特点:
相关概念:
基于 Ray Core 实现的面向数据可选和人工智能的库,可实现数据读取、加工、训练、推理等一站式 pipeline。
Ray 集群是一组Worker节点和一个Head节点组成。Ray 集群可以是固定大小的,也可以根据在集群上运行的应用程序负载进。请参考官网Ray Clusters Overview
Ray Cluster部署后,便可以在Ray Cluster上提交作业。提交作业有两种方式,具体使用方式可以参考RayCluster快速入门和RayJob快速入门。
Ray的核心API包括以下几个关键概念:
ray.init()
: 初始化Ray集群。你可以传递一个地址来连接到一个已存在的集群。@ray.remote
: 将函数转换为任务,将类转换为actor(演员)。ray.put()
: 将值放入Ray的对象存储。ray.get()
: 从对象存储中获取值。返回你之前放入的值,或者是任务或actor计算的结果。.remote()
: 在Ray集群上运行actor方法或任务,并用于实例化actor。ray.wait()
: 返回两个对象引用列表,一个包含已完成的任务,另一个包含未完成的任务。示例:
通过一个简单的例子来理解Ray是如何工作的。假设有一个数据库,想要从数据库中检索数据。以下是使用Ray进行并行化处理的示例:
import ray import time start = time.time() # 模拟数据库的Python列表 database = ["Learning", "Ray", "Flexible", "Distributed", "Python", "for", "Machine", "Learning"] # 使用ray.remote装饰器将函数转换为Ray任务 @ray.remote def retrieve_task(item): time.sleep(item / 10.) return item, database[item] # 启动Ray任务并收集它们的引用 object_references = [retrieve_task.remote(item) for item in range(5)] # 使用ray.get()获取任务结果 data = ray.get(object_references) # 打印获取数据所需的时间 print(f'Runtime: {time.time() - start:.2f} seconds, data:') print(*data, sep="\n")