部署Ray的环境。建议在EMR on VKE产品中部署Ray服务,参考EMR官网进行环境部署。也可以按照官网执行pip install ray方式进行手动部署。
单机 Python 程序改造为 Ray 程序
示例:
计算一组数字的平方。
传统单机 Python 程序
改造为 Ray 程序
def compute_square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
squares = [compute_square(x) for x in numbers]
print(squares)
import ray
# 初始化 Ray
ray.init()
# 将函数转换为 Ray 任务
@ray.remote
def compute_square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
# 异步调用 Ray 任务
futures = [compute_square.remote(x) for x in numbers]
# 获取任务结果
squares = ray.get(futures)
print(squares)
# 关闭 Ray
ray.shutdown()
改造步骤:
导入 Ray 并初始化 Ray 集群。
使用 @ray.remote 装饰器将函数转换为 Ray 任务。
使用 .remote() 方法调用 Ray 任务。
使用 ray.get() 获取任务结果。
多进程改造为 Ray 程序
示例:
并行处理多个数据块。
传统多进程 Python 程序
Ray 程序
import multiprocessing
def process_data(data):
return [x * 2 for x in data]
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with multiprocessing.Pool(processes=3) as pool:
results = pool.map(process_data, data_chunks)
print(results)
import ray
# 初始化 Ray
ray.init()
# 将函数转换为 Ray 任务
@ray.remote
def process_data(data):
return [x * 2 for x in data]
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
# 异步调用 Ray 任务
futures = [process_data.remote(chunk) for chunk in data_chunks]
# 获取任务结果
results = ray.get(futures)
print(results)
# 关闭 Ray
ray.shutdown()
# 不推荐的做法
results = [ray.get(my_function.remote(x)) for x in range(4)]
# 推荐的做法
result_ids = [my_function.remote(x) for x in range(4)]
results = ray.get(result_ids)
result_ids = [my_function.remote(x) for x in range(4)]
while result_ids:
done_id, result_ids = ray.wait(result_ids)
result = ray.get(done_id[0])
process_result(result)