在当今数字化时代,多模态数据处理尤其是图片处理成为了众多领域关注的焦点。从社交媒体平台的图像智能推荐,到医疗影像的精准分析,多模态图片处理技术正发挥着越来越重要的作用。而 Ray 作为一款强大的分布式计算框架,在多模态图片处理架构实践中展现出了卓越的性能与灵活性。
本文以图片编辑的微调模型训练为样例,描述了在该场景上利用 Ray+Lance 架构完成升级带来的巨大收益。
图片编辑模型的微调在处理过程一般会包含以下四个步骤:
在用户历史架构中需要关注如下五点。
def load_image(ds, idx): # Utility function to load an image at an index and convert it from bytes format to img format raw_img = ds.take([idx], columns=['image']).to_pydict() raw_img = np.frombuffer(b''.join(raw_img['image']), dtype=np.uint8) img = cv2.imdecode(raw_img, cv2.IMREAD_COLOR) img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) return img def load_caption(ds, idx): # Utility function to load an image's caption. Currently we return the longest caption of all captions = ds.take([idx], columns=['captions']).to_pydict()['captions'][0] return max(captions, key=len) class CLIPLanceDataset(Dataset): """Custom Dataset to load images and their corresponding captions""" def __init__(self, lance_path, max_len=18, tokenizer=None, transforms=None, storage_options=None): self.ds = lance.dataset(lance_path, storage_options=storage_options) self.max_len = max_len # Init a new tokenizer if not specified already self.tokenizer = AutoTokenizer.from_pretrained('bert-base-cased') if not tokenizer else tokenizer self.transforms = transforms def __len__(self): return self.ds.count_rows() def __getitem__(self, idx): # Load the image and caption img = load_image(self.ds, idx) caption = load_caption(self.ds, idx) # Apply transformations to the images if self.transforms: img = self.transforms(img) # Tokenize the caption caption = self.tokenizer( caption, truncation=True, padding='max_length', max_length=self.max_len, return_tensors='pt' ) # Flatten each component of tokenized caption otherwise they will cause size mismatch errors during training caption = {k: v.flatten() for k, v in caption.items()} return img, caption
Pytorch DDP VS Ray Pytorch DDP示例:
def train_func(config): # Setup distributed env dist.init_process_group('nccl') rank = os.environ["LOCAL_RANK"] device = torch.device("cuda:" + rank) # Setup DDP model model = MyTorchModel(...) model = model.to(device) model = DDP(model, device_ids=[device]) # Setup distributed sampler sampler = DistributedSampler( dataset, rank=os.environ["RANK"], num_replicas=os.environ["WORLD_SIZE"], shuffle=True, ) dataloader = DataLoader( ..., sampler=sampler ) # Training for epoch in range(num_epochs): for inputs, labels in dataloader: # train batch inputs = inputs.to(device) labels = labels.to(device)
from ray.train.torch import prepare_model, prepare_data_loader, TorchTrainer def train_func(config): model = MyTorchModel(...) model = prepare_model(model) dataloader = DataLoader(...) dataloader = prepare_data_loader(dataloader) # Training for epoch in range(num_epochs): for inputs, labels in dataloader: ... from ray.train.torch import TorchTrainer from ray.train import ScalingConfig trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=16, use_gpu=True) ) trainer.fit()
Ray Train + Ray Data示例:
import ray.data import ray.train # Run On GPU def train_func(): dataset_shard = ray.train.get_dataset_shard("train") dataloader = dataset_shard.iter_torch_batches(...) ... scaling_config = ray.train.ScalingConfig(num_workers=4, use_gpu=True) # Run on CPU Nodes dataset = ray.data.read_parquet(...).map_batches(...) trainer = TorchTrainer( train_func, scaling_config = scaling_config, datasets = {"train": dataset}, ) trainer.fit()
4TB/300CU 转成lacne格式,并且过滤一些异常图片异常url,Time Cost 1小时。
利用长宽属性增加一个label. big_pic(boolean),4TB/300CU,Time Cost 1.5分钟。
利用Ray的分布式架构进行离线推理Score,从原有的300CU的CPU到4卡GPU分布式,性能得到大幅度提升。