在多模态数据预处理的工作流程中,通常会涉及对音频数据进行清洗、音频转文本、说话人识别、情感分析以及语言识别等一系列操作,以完成对音频数据的标注。借助这些标注后的音频数据,能够更加便捷且准确地执行后续的风险控制、音频模型训练与评估等操作。本文针对该场景基于 EMR Serverless Ray 提供一个端到端的最优实践方案。
利用 EMR Ray 的分布式文件系统接口,高效地从多种数据源(如TOS)批量导入音频数据。例如,可以通过编写简单的 Ray 任务来并行化数据读取操作,提高数据导入速度,确保大规模音频数据能够快速进入处理流程。
利用Lance的高效多模态数据存储模式,可以将音频数据与音频数据的元数据及标签存储在一起,加速数据导入与处理。
ds = ray.data.read_binary_files(dataset_path, filesystem=tos_fs)
依据音频的质量或特征进行过滤操作时,可借助 EMR Ray 引擎的 ray.data 模块,并结合音频处理库(如 libsora 等),以分布式方式处理大规模语音数据。这样能够保留高质量音频数据,为后续的语音识别及分析提供更优良的数据基础。
以下为常见的过滤指标:
音频切片是为后续精细处理奠定基础。依特定时长或语义单元划分音频,例如按语句停顿、语义主题转换处精准切割。对于长时段演讲音频,可切成段落片段,便于逐段深度剖析;对话场景则依不同说话人轮次切分,利于后续说话人识别及内容针对性处理。切片精准度关乎后续步骤准确性,需借助音频能量变化检测、语音端点检测算法锁定起止点,避免切断完整语义内容,且切片后要合理标记存储,关联上下文信息,保障处理连贯性。
EMR平台中使用ray.data模块的分布式能力和libsora的音频处理能力对音频数据进行切分。
音频数据内容识别在音频数据预处理阶段占据着至关重要的地位。这是因为识别出的文本对于后续平台的风控工作(如涉黄等)以及音频模型的训练而言,是极为关键的输入内容。
利用 EMR Ray 平台所提供的丰富多样的异构计算资源,能够为音频数据内容识别工作提供有力的支持。在这个过程当中,可以借助 GPU 所具有的强大性能优势,加载诸如 Whisper、ultravox 以及 SenseVoice 等一系列表现极为出色的语音大模型。通过这样的方式,便能够顺利地开展高效且精准度极高的语音识别工作,为音频数据的处理和应用提供坚实的基础。
在音频数据预处理阶段,会涉及到针对海量音频数据展开打标这一重要过程。具体而言,要对音频数据中的语音进行多方面的识别与分析,包括但不限于识别其情感表达、确定说话人的身份、辨别所用的语言种类,以及准确区分音频的不同种类等。对这些丰富多样的音频数据进行细致的打标操作,具有重大意义,因为这将极大地有助于后续音频大模型的训练以及精准微调。
在 EMR 平台当中,可以充分借助 Ray 所具备的强大分布式能力以及丰富的异构计算资源。同时,还能利用开源语音大模型(如 ultravox、SenseVoice 等)展现出的卓越能力,对音频数据当中的情感表达进行深入的分析和精准识别,并且将识别结果附加到音频数据的标签数据之中。
将按照上述步骤所生成的音频标签等数据与原始音频数据的元信息(其中涵盖音频文件名、时长以及来源等方面)以及音频数据本身进行充分融合,从而构建起一个多模态的数据集。通过借助 EMR Ray 的分布式存储输出接口,把处理后的结果以 lance 数据格式存储至特定的存储位置(诸如 TOS 等),这样做的目的是为了便于后续对其进行查询、分析以及开展训练工作。
import io import ray from typing import Dict import numpy as np from tosfs.core import TosFileSystem import transformers import librosa from lance.ray.sink import LanceDatasink from ray.data.dataset import Schema from ray.data._internal.pandas_block import PandasBlockSchema class TypeConverter: def __init__(self, dataset_schema: Schema, required_schema: pa.lib.Schema = None,) -> None: self.field_names = dataset_schema.names if required_schema is not None: self.schema = required_schema elif isinstance(dataset_schema.base_schema, pa.lib.Schema): self.schema = required_schema elif isinstance(dataset_schema.base_schema, PandasBlockSchema): self.schema = self.speculation_type(dataset_schema) else: raise ValueError( f"Unsupported schema type: {type(dataset_schema.base_schema)}") def __call__(self, batches) -> pa.Table: columns = {} for field_name in self.field_names: column_data = batches[field_name] if (field_name == "json"): parsed_json = json.loads(column_data[0].decode("utf-8")) for key in parsed_json.keys(): if key in self.schema.names: columns[key] = [json.loads(item.decode( "utf-8"))[key] for item in column_data] else: columns[field_name] = column_data return pa.Table.from_pydict(columns, schema=self.schema) class AudioPreprocessor: """ Preprocesses audio assets for the given model. """ def __init__(self, model_name: str): def __call__(self, audio_batches): audio_batches["time"] = [] for audio_bytes in audio_batches["bytes"]: # 将字节流转换为字节缓冲区 audio_buffer = io.BytesIO(audio_bytes) # 使用librosa从字节缓冲区加载音频 waveform, sample_rate = librosa.load(audio_buffer, sr=None) # 获取音频时长 duration = librosa.get_duration(y=waveform, sr=sample_rate) audio_batches["time"].append(duration) return audio_batches model_name = "fixie-ai/ultravox-v0_3" class AudioPredictor: """ Predicts the score of the given audio assets. """ def __init__(self, model_name: str): self.model_name = model_name self.pipe = transformers.pipeline(model=model_name, trust_remote_code=True) # 定义对话上下文 self.turns = ( { "role": "system", "content": "输出该音频的内容,输出音频的说话人身份,年龄,语气,以json格式输出,格式为:{\"text\": \"音频内容\", \"speaker\": \"说话人身份\", \"mood\": \"语气\"},其中text为音频内容,speaker为说话人身份,mood为语气。" }, ) def __call__(self, audio_batches): audio_batches["text"] = [] audio_batches["speaker"] = [] audio_batches["mood"] = [] for audio_bytes in audio_batches["bytes"]: # 将字节流转换为字节缓冲区 audio_buffer = io.BytesIO(audio_bytes) # 使用librosa从字节缓冲区加载音频 waveform, sample_rate = librosa.load(audio_buffer, sr=None) # 使用Ultravox处理音频输入并生成响应 response = self.pipe({'audio': waveform, 'turns': self.turns, 'sampling_rate': sample_rate}, max_new_tokens=30) try : value = json.loads(response) except json.decoder.JSONDecodeError as e: value = {"text":"","speaker":"","mood":""} audio_batches["text"].append(value.get('text', '')) audio_batches["speaker"].append(value.get('speaker', '')) audio_batches["mood"].append(value.get('mood', '')) return audio_batches dataset_path = "tos://{bucket_name}/dataset/audio/" tos_fs = TosFileSystem( key='{AK}', secret='{SK}', #请填写实际endpoint endpoint_url='https://tos-cn-beijing.ivolces.com', region='cn-beijing', socket_timeout=60, connection_timeout=60, max_retry_num=30 ) # load audio data ds = ray.data.read_binary_files(dataset_path, filesystem=tos_fs, include_paths=True) # preprocess audio data, filte audio data which is too long ds = ds.map_batches(AudioPreprocessor, fn_constructor_args=()) # filte out < 3s audios ds = ds.filter(lambda x: x["time"] > 3) # predict audio data, predict text of each audio data ds = ds.map_batches(AudioPredictor, fn_constructor_args=(), num_gpus=1) # save result to tos # Create a schema for the Lance dataset import pyarrow as pa schema = pa.schema([("bytes", pa.binary()), ("time", pa.int()), ("text", pa.string()), ("speaker", pa.string()), ("mood", pa.string())]) # 类型转换 ds = ds.map_batches(TypeConverter, fn_constructor_args=( ds.schema(), schema), batch_size=10, num_cpus=1, concurrency=30) # 数据写入lance # 写入lance storage_options = { "access_key_id": "{access key}", "secret_access_key": "{secret access key}", #请根据用户实际地域填写region和endpoint "aws_region": "cn-beijing", "aws_endpoint": "https://tos-cn-beijing.ivolces.com", "virtual_hosted_style_request": "true" } sink = LanceDatasink( "tos://bucket_name/dataset/audio_lance", schema=schema, max_rows_per_file=2000, # 可再调小一点,增加后续处理的并发 storage_options=storage_options, mode="overwrite") ds.write_datasink(sink)
# Ray任务参数 set las.query.engine.type = RayJob; set serverless.ray.version = 2.39; set serverless.ray.image = image-repo-cn-beijing.cr.volces.com/test/ray-ml:2.39.0-cu12.2.2-py3.11-ubuntu22.04-365 set serverless.ray.head.cpu = 4; set serverless.ray.head.memory = 16Gi; set serverless.ray.worker.cpu = 4; set serverless.ray.worker.memory = 16Gi; set serverless.ray.worker.replicas = 10; # tos文件路径 set serverless.ray.entrypoint.bundle.path = tos://dujl/audio-demo.zip; set serverless.ray.entrypoint.cmd = python /home/ray/workdir/audio-inference-demo.py; # GPU配置相关参数 # 指定GPU节点池 set serverless.ray.node.selector.las.private.spark.queue.node.label=gpu-pool; set serverless.ray.burst.vci=false; # head节点参数 set serverless.ray.head.annotation.volcano.sh/burst-to-vci=none; set serverless.ray.head.gpu.vendor=nvidia.com; set serverless.ray.head.gpu.amount=0; # work节点参数 set serverless.ray.worker.annotation.volcano.sh/burst-to-vci=none; set serverless.ray.worker.gpu.vendor=nvidia.com; set serverless.ray.worker.gpu.amount=1;
目前EMR Serverless Ray提供的GPU卡类型有A10与L20,其他种类的GPU卡也在添加中。详细参数请参考:EMR Serverless GPU参数说明。