You need to enable JavaScript to run this app.
导航
Ray 在多模态图片处理的架构实践
最近更新时间:2024.12.16 17:15:05首次发布时间:2024.12.16 17:02:20

在当今数字化时代,多模态数据处理尤其是图片处理成为了众多领域关注的焦点。从社交媒体平台的图像智能推荐,到医疗影像的精准分析,多模态图片处理技术正发挥着越来越重要的作用。而 Ray 作为一款强大的分布式计算框架,在多模态图片处理架构实践中展现出了卓越的性能与灵活性。

背景

本文以图片编辑的微调模型训练为样例,描述了在该场景上利用 Ray+Lance 架构完成升级带来的巨大收益。
Image
图片编辑模型的微调在处理过程一般会包含以下四个步骤:

  • 数据爬取:
    • 开放图片数据集,基于训练的目标去有偏向的选择数据集的搜集。
  • 数据过滤:
    • 高宽比信息
    • Url 数据来源
    • 光照和阴影
    • 美学分阈值
  • 模型推理处理:
    • 美学分打分
    • Caption
    • Clip
  • 模型训练微调
    • Stable Duffsion
    • FLUX.1-dev

历史架构

Image
在用户历史架构中需要关注如下五点。

  1. 数据存储:数据采集回来的存储方式是按照webdataset的形式组织。
    1. 使用webdataset可以解决小文件造成的对象存储QPS过高的问题。
    2. 缺点:如果随机读取文件或者基于某些过滤条件,webdataset的性能比较差,需要将tar包中的json解压出来,IO读放大影响严重。
  2. 数据分片:对数据进行预处理需要分布式时,使用的是手工拆分目录,然后每个机器只处理部分数据。
    1. 缺点:手工拆分数据集是个复杂事项,需要评估每个分配到的task的数据是否均匀。
    2. 缺点:如果在数据处理过程中有shuffle,或者汇聚的操作,则手工分布式是不同的任务,无法在同一个任务中处理不同的数据。
  3. 异构数据处理:cpu与gpu算子的都在同一个节点上,cpu是直接利用gpu的宿主机上的资源。
    1. 缺点:当CPU的算子和GPU算子的并发差别比较大时,会造成单算子的背压情况。
  4. 训练数据准备:Train的最后一公里 一般会使用一些简单的条件做一些数据集的筛选,因此客户选择的方案是将图片数据的标签存储到数据库,然后将图片按照parquet的列进行存储,因为parquet是只有图片内容单列,IO放大的问题会有所减轻。
    1. 缺点:需要关联两种数据源,还需要有关联逻辑。
    2. 缺点:Dataloader是一个完整列,会带上image_content列, 所以在加载过程中的Shuffle使用内存比较大,loader的性能比较慢
  5. 模型训练:通过Pytorch DDP实现单机多卡的分布式训练。

火山升级后架构

Image

  1. 数据存储:数据存储从原来的webdataset转成了lance的格式,lance在多模存储场景有很大的优势,lance将原有存储在json的元数据信息展开成了具体的列,同时把image的内容以binary类型持久化。lance具体的介绍可以详见 使用Lance数据格式训练CLIP多模模型
    1. 优点:元数据和数据可以同时以lance table的形式存储,
    2. 优点:随机读取和图片文件的QPS压力减轻
    3. 优点: 加列或者加标签的场景可以用Lance 的merge_columns能力
  2. 数据分片:使用Ray Data的方式直接对数据进行分片处理。
    1. 优点: Ray Data具备自动分片的能力,只需要设置并发数和num_cpus就可以完成并发量和资源消耗的控制。
    2. 优点: Ray Data本身具备背压的能力,可以控制任务执行的速率,防止oom的发生。
    3. 优点:火山的Ray Data增强了弹性扩缩能力,webdataset json展开的特性, 以及背压策略增加的场景。
  3. 异构数据处理:数据过滤使用的cpu,而推理的过程使用的是gpu,Ray本身可以在一个Pipeline中同时调度异构的资源类型。
    1. 优点:异构资源数据传递是内存传递,无需持久化,性能会更优,支持的迭代次数更频繁。
    2. 优点:算子可以在不同的资源池上触发,因此可以很好的支持算子不同的workload,例如filter的筛选率很高,所需cpu 比较高,这个时候cpu的资源池可以 预制的比较大。
    3. 优点:推理过程既可以使用在线推理也可以使用离线推理,在线推理可以实现资源资源预占和模型的预加载,如果推理内容的参数不是很大,可以通过Fastapi传递, 则可以使用在线推理。如果资源无需预占,可以使用离线推理。
  4. 训练数据准备:经过处理后使用lance来对数据进行存储, 在Dataloader中lance可以利用rowid主键作为shuffle的信息,因此加载到内存里的数量会减少很多,因此load的时间会大大减少。
def load_image(ds, idx):
    # Utility function to load an image at an index and convert it from bytes format to img format
    raw_img = ds.take([idx], columns=['image']).to_pydict()
    raw_img = np.frombuffer(b''.join(raw_img['image']), dtype=np.uint8)
    img = cv2.imdecode(raw_img, cv2.IMREAD_COLOR)
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    return img

def load_caption(ds, idx):
    # Utility function to load an image's caption. Currently we return the longest caption of all
    captions = ds.take([idx], columns=['captions']).to_pydict()['captions'][0]
    return max(captions, key=len)

class CLIPLanceDataset(Dataset):
    """Custom Dataset to load images and their corresponding captions"""
    def __init__(self, lance_path, max_len=18, tokenizer=None, transforms=None, storage_options=None):
        self.ds = lance.dataset(lance_path, storage_options=storage_options)
        self.max_len = max_len
        # Init a new tokenizer if not specified already
        self.tokenizer = AutoTokenizer.from_pretrained('bert-base-cased') if not tokenizer else tokenizer
        self.transforms = transforms

    def __len__(self):
        return self.ds.count_rows()

    def __getitem__(self, idx):
        # Load the image and caption
        img = load_image(self.ds, idx)
        caption = load_caption(self.ds, idx)

        # Apply transformations to the images
        if self.transforms:
            img = self.transforms(img)

        # Tokenize the caption
        caption = self.tokenizer(
            caption,
            truncation=True,
            padding='max_length',
            max_length=self.max_len,
            return_tensors='pt'
        )
        # Flatten each component of tokenized caption otherwise they will cause size mismatch errors during training
        caption = {k: v.flatten() for k, v in caption.items()}

        return img, caption 
  1. 模型训练:在Ray中使用Ray DDP的方式来改造原来单机多卡的ddp。在代码改造上,兼容性比较好,并且无需感知rank的位置。
    1. 优点:可以省略以下内容:
      • 初始化环境
      • 设置DDP model
      • 设置数据分片逻辑
      • 数据迁移到GPU
    2. 优点:可以利用Ray Data做数据加载。从而减少最后一公里的数据简单转换还需要持久化的过程。

Pytorch DDP VS Ray Pytorch DDP示例

def train_func(config):
    # Setup distributed env
    dist.init_process_group('nccl')
    rank = os.environ["LOCAL_RANK"]
    device = torch.device("cuda:" + rank)
    
    # Setup DDP model
    model = MyTorchModel(...)
    model = model.to(device)
    model = DDP(model, device_ids=[device])
    
    # Setup distributed sampler
    sampler = DistributedSampler(
        dataset,
        rank=os.environ["RANK"],     num_replicas=os.environ["WORLD_SIZE"],
        shuffle=True,
    )
    dataloader = DataLoader(
       ...,
        sampler=sampler
    )
    
    # Training
    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
            # train batch
            inputs = inputs.to(device)
            labels = labels.to(device)
from ray.train.torch import prepare_model, prepare_data_loader, TorchTrainer

def train_func(config):
    model = MyTorchModel(...)
    model = prepare_model(model)
    dataloader = DataLoader(...)
    dataloader =  prepare_data_loader(dataloader)
    # Training
    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
           ...

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=16, use_gpu=True)
)
trainer.fit()

Ray Train + Ray Data示例

import ray.data
import ray.train

# Run On GPU
def train_func():
    dataset_shard = ray.train.get_dataset_shard("train")
    dataloader = dataset_shard.iter_torch_batches(...)
   ...
   
scaling_config = ray.train.ScalingConfig(num_workers=4, use_gpu=True)
# Run on CPU Nodes
dataset = ray.data.read_parquet(...).map_batches(...)
trainer = TorchTrainer(
        train_func,
        scaling_config = scaling_config,
        datasets = {"train": dataset},
    )
trainer.fit()

新架构收益

Webdataset2Lance

4TB/300CU 转成lacne格式,并且过滤一些异常图片异常url,Time Cost 1小时。

CPU MergeColumns

利用长宽属性增加一个label. big_pic(boolean),4TB/300CU,Time Cost 1.5分钟。

离线推理美学分

利用Ray的分布式架构进行离线推理Score,从原有的300CU的CPU到4卡GPU分布式,性能得到大幅度提升。
Image
Image
Image
Image

模型训练

  1. DataLoader : 4T 数据Parquet加载需要一个小时,而Lance 只需要几分钟, 同时还可以增加一些简单的条件筛选。
  2. Distributed Train: 单机的多进程改成了多机多并发的场景,从原有线下固定资源池迁移到线上按需资源计费,从原有的训练48h,优化到6h。