You need to enable JavaScript to run this app.
导航
将传统的 Python 程序改造为 Ray 程序的实践指南
最近更新时间:2024.09.04 20:01:42首次发布时间:2024.09.04 19:50:08

Ray 是一个灵活的分布式计算框架,支持并行任务和基于 Actor 模型的异步执行。通过将传统的 Python 程序改造为 Ray 程序,可以充分利用 Ray 的并行和分布式计算能力,提高程序的性能和扩展性。本文将详细介绍如何将单机 Python 程序和多进程程序改造为Ray程序。

Ray API介绍

API

描述

示例

ray.init()

初始化Ray上下文。

@ray.remote

函数或类的装饰器,指定函数将作为task执行以及类将作为actor,在不同的进程中执行。

@ray.remote
class Actor(object):
    def method(y)
        ...                                     …

.remote

用于每个远程函数、远程类声明或远程类方法调用。远程操作是异步的。

ret_id = fun.remote(x)
a = Actor.remote()
ret_id = a.method.remote(y)

ray.put()

将对象存储在object store中,并返回其ID。此ID可用于将对象作为参数传递给任何远程函数或方法调用。这是一个同步操作。

x_id = ray.put(x)

ray.get()

从对象ID或对象ID列表中返回一个对象或对象列表。这是一个同步(即阻塞)操作。

x = ray.get(x_id)
…
objects = ray.get(object_ids)

ray.wait()

从对象ID列表中返回:已准备好的对象ID列表,和尚未准备好的对象ID列表。

ready_ids, not_ready_ids = ray.wait(object_ids)

将传统Python程序改造为Ray程序场景示例

准备工作

部署Ray的环境。建议在EMR on VKE产品中部署Ray服务,参考EMR官网进行环境部署。也可以按照官网执行pip install ray方式进行手动部署。

单机 Python 程序改造为 Ray 程序

示例:
计算一组数字的平方。

传统单机 Python 程序

改造为 Ray 程序

def compute_square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]
squares = [compute_square(x) for x in numbers]
print(squares)
import ray

# 初始化 Ray
ray.init()

# 将函数转换为 Ray 任务
@ray.remote
def compute_square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

# 异步调用 Ray 任务
futures = [compute_square.remote(x) for x in numbers]

# 获取任务结果
squares = ray.get(futures)
print(squares)

# 关闭 Ray
ray.shutdown()

改造步骤:

  1. 导入 Ray 并初始化 Ray 集群。
  2. 使用 @ray.remote 装饰器将函数转换为 Ray 任务。
  3. 使用 .remote() 方法调用 Ray 任务。
  4. 使用 ray.get() 获取任务结果。

多进程改造为 Ray 程序

示例:
并行处理多个数据块。

传统多进程 Python 程序

Ray 程序

import multiprocessing

def process_data(data):
    return [x * 2 for x in data]

data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

with multiprocessing.Pool(processes=3) as pool:
    results = pool.map(process_data, data_chunks)

print(results)
import ray

# 初始化 Ray
ray.init()

# 将函数转换为 Ray 任务
@ray.remote
def process_data(data):
    return [x * 2 for x in data]

data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

# 异步调用 Ray 任务
futures = [process_data.remote(chunk) for chunk in data_chunks]

# 获取任务结果
results = ray.get(futures)
print(results)

# 关闭 Ray
ray.shutdown()

改造为 Ray 程序的步骤

  1. 导入 Ray 并初始化 Ray 集群。
  2. 使用 @ray.remote 装饰器将函数转换为 Ray 任务。
  3. 使用 .remote() 方法调用 Ray 任务。
  4. 使用 ray.get() 获取任务结果。

改造为 Ray 程序的优势:

  • 高效的数据处理:
    • 在Ray提供有共享的对象存储,避免了Python multiprocessing在进程间传递大对象时的序列化和反序列化开销。
    • 对于有状态的计算:Ray提供了actor抽象,使得在分布式环境中封装和修改状态变得简单,而Python multiprocessing缺乏这种自然的并行化方式,导致实现复杂且性能低下。
  • 扩展性和容错性:
    • 分布式计算:Ray设计用于可扩展性,可以在单机、集群甚至云环境中运行相同的代码。它能够轻松扩展到数百甚至数千个节点。
    • 自动故障恢复:Ray工作负载自动从机器和进程故障中恢复,确保计算任务的可靠性和稳定性。

使用Ray Data模块改造pipeline链路

Ray data模块采用Dataset定义分布的数据集。当需要对数据集进行一系列的pipeline操作时,可以采用Ray data模型。
图片
示例:

multiprocessing的示例

Ray示例

import multiprocessing
import time


# 数据处理步骤 1
def process_step1(data):
    time.sleep(1)
    return [item * 2 for item in data]

# 数据处理步骤 2
def process_step2(data):
    time.sleep(1)
    return [item + 5 for item in data]

# 数据处理步骤 3
def process_step3(data):
    time.sleep(1)
    return [item * 3 for item in data]

if __name__ == '__main__':
    data = [i for i in range(10)]

    # 使用 multiprocessing 进行并行处理
    pool = multiprocessing.Pool()

    step1_results = pool.apply_async(process_step1, (data,))
    step2_input = step1_results.get()
    step2_results = pool.apply_async(process_step2, (step2_input,))
    step3_input = step2_results.get()
    final_results = pool.apply_async(process_step3, (step3_input,))

    pool.close()
    pool.join()

    print(final_results.get())
import ray
import time
from typing import Dict

ray.init()

# 数据处理步骤 1 data的类型为:Dict[str, int]
def process_step1(data):
    time.sleep(1)
    return {'id':[item * 2 for item in data['id']]}

# 数据处理步骤 2
def process_step2(data):
    time.sleep(1)
    return {'id':[item + 5 for item in data['id']]}

# 数据处理步骤 3
def process_step3(data):
    time.sleep(1)
    return {'id':[item * 3 for item in data['id']]}

if __name__ == '__main__':
    # 启动数据生成任务
    result = ray.data.range(10)\
        .map_batches(process_step1)\
        .map_batches(process_step2)\
        .map_batches(process_step3)\
        .take_all()

    # ray.data.Dataset可以带有schema元数据。
    # 这里返回结果{'id': 27}, {'id': 15}, {'id': 21}, {'id': 69}, {'id': 39}, {'id': 63}, {'id': 57}, {'id': 51}, {'id': 33}, {'id': 45}]
    print(result)

    ray.shutdown()

使用Ray API的注意事项

调用远程函数和方法

远程函数和方法调用是异步的,返回一个对象ID(ObjectID),而不是实际结果。要获取实际结果,需要使用ray.get()

# 异步调用
result_id = my_function.remote(5)

# 获取结果
result = ray.get(result_id)

延迟调用ray.get()

ray.get()是一个阻塞操作,会等待结果准备好。如果过早调用ray.get()会阻塞程序的其他操作,影响并行性。建议在调用所有任务后再调用ray.get()

# 不推荐的做法
results = [ray.get(my_function.remote(x)) for x in range(4)]

# 推荐的做法
result_ids = [my_function.remote(x) for x in range(4)]
results = ray.get(result_ids)

避免微小任务

每个远程任务调用都有一定的开销。如果任务非常小,Ray程序可能比等效的Python程序更慢。建议将任务设计为至少耗时几毫秒,以摊薄调用开销。

# 不推荐的做法
@ray.remote
def tiny_task(x):
    return x

# 推荐的做法
@ray.remote
def larger_task(start, end):
    return [x for x in range(start, end)]

使用ray.put()存储大对象

当将大对象作为参数传递给远程函数时,Ray会在后台调用ray.put()将对象存储在本地对象存储中。如果多次传递相同的大对象,建议显式调用ray.put()并传递其ID,以避免重复复制对象。

# 不推荐的做法
result_ids = [my_function.remote(large_object) for _ in range(10)]

# 推荐的做法
large_object_id = ray.put(large_object)
result_ids = [my_function.remote(large_object_id) for _ in range(10)]

使用ray.wait()尽早处理结果

如果对多个任务的结果使用ray.get(),需要等待最后一个任务完成,这会增加程序运行时间。建议使用ray.wait(),它会在任意一个对象准备好时返回,从而可以尽早处理结果。

result_ids = [my_function.remote(x) for x in range(4)]
while result_ids:
    done_id, result_ids = ray.wait(result_ids)
    result = ray.get(done_id[0])
    process_result(result)