本文帮助您了解如何通过 API 使用批量推理。相比在线推理,显著提升处理吞吐量并降低使用成本,非常适合无需即时响应的任务。
说明
为充分发挥批量推理高配额优势,使用 Batch Chat 时,建议保持高并发推压,如创建大量(如10万个)线程或者协程并发请求,并配置较大超时时间,保障任务的完成率,示例代码供参考。
- 如您希望通过控制台操作,请参见文档 批量推理。
- 如您希望直接查看API文档,请参见 批量(Chat) API、批量(Job) API。
当前批量推理功能支持 深度思考能力、文本生成能力、视觉理解能力的模型及基于基础模型精调后的模型(精调后模型支持情况请以批量推理控制台可选模型为准)。
方舟会开放更多模型,如您近期有其他模型批量推理的需求,请提交工单onCall与我们沟通。
详细费用说明请参见模型服务价格。
方舟提供了2种批量推理方式,您可根据自己的业务场景选择合适的调用方式。
方式 | Batch Job(创建批量推理任务) | Batch Chat(调用批量推理接入点) |
---|---|---|
工作方式 | 使用
| 调用接口将处理任务推送给平台后,一段时间后(时间长短受资源影响),方舟平台会返回结果。 |
应用场景 |
|
|
核心流程 |
|
|
优势 |
|
|
劣势 | 存在工程改造量,可能涉及数据上传、分片、切分目录等环节改造。 |
|
doubao-pro-32k-***
所有版本)汇总计算。举例说明:您主账号下,doubao-pro-32k 模型,a、b、c 3个的批量推理任务,d、e 2个在线推理任务。则您每天a、b、c 批量推理共用 10 B token 配额,d 、e 共用在线推理任务 TPM、RPM 限流额度。
您可以用极小的代价,将业务从在线推理迁移至批量推理,则可以使用 Batch Chat 的方式实现批量推理。
查看接口字段详细说明见对话(Chat)-批量 API。
- 如您使用 Access Key 鉴权,可参见 Access Key 签名鉴权。
- 为了更安全使用 API Key / Access Key,建议您将 API Key / Access Key 配置到环境变量中。
您可以使用最小化示例代码快速发起一个批量推理请求。
与在线推理不同的是,您需使用 Batch Chat 接口,并替换<Model>
为您的批量推理接入点 ID。
import os from volcenginesdkarkruntime import Ark # 从环境变量中读取您的方舟API Key client = Ark( api_key=os.environ.get("ARK_API_KEY"), # 1小时超时 timeout=3600, ) # 使用batch_chat.completions接口 completion = client.batch_chat.completions.create( # 替换 <Model>为批量推理接入点 ID model="<MODEL>", messages=[ {"role": "user", "content": "你好"} ] ) print(completion.choices[0].message.content)
为了充分发挥批量推理任务高配额的优势,建议保持高并发推压,如 10万+线程/协程(Python 语言存在线程开销较大问题,推荐使用如 asyncio 异步协程,或多进程方案优化性能),多机部署扩展等策略。同时为了保证请求完成率,需要配置一个较大的超时时间,各语言实现的示例代码供您参考。
为了提高任务请求完成比例,避免服务端高负载时,大量请求排队中超时,推荐您手动配置一个较大的超时时间(取值在1小时到72小时)。推荐配置24小时~72小时,无需您手动配置重试策略,方舟会智能地进行任务调度,保障您任务的完成率。
超时时间对应
timeout
字段,代表着单个请求的超时时间。请放心设置,较高的超时时间并不会对配额有影响。若设置较短的超时时间,在需要较长时间回复的场景(如深度思考模型输出思维链内容,回复篇幅更长、速率更慢),任务处理中途超时断开连接,既浪费您的 token 成本,又无法输出内容。
方舟客户端会在超时时间范围内自动重试请求,或者按照方舟服务端要求暂停若干时间后重试。在方舟服务端,则根据负载情况对请求做出暂不处理、排队、指定客户端若干时间后重试等操作。
... client = AsyncArk( api_key=os.environ.get("ARK_API_KEY"), # 使用异步接口并配置较高的超时时间,推荐24小时~72小时 timeout=3600*24, ) ...
方舟为每个主账户提供至少10B token/天的配额,您可通过请求并发数及副本数的配置,将大量任务提交到方舟平台处理。方舟平台会根据业务负载,为您智能调度执行任务,甚至可在低负载时,为您提供超出基础配额的处理能力。
请求并发数
workerNum
表示 向方舟服务端发起请求时的最大并发数。它决定了在同一时间内,业务能够向服务端发起请求的最大数量。
在创建 client 之后,且在向方舟服务端发起请求之前,需要启动 client 的相关线程。
对于 Java 客户端,由于其自带了线程池,因此无需额外手动启动线程。根据不同的编程语言,调用相应的方法会有所不同。为确保准确调用,请参照示例代码调用。
配置workerNum
,推荐业务综合考虑业务预期的最大峰值、平均时延、机器并发上限等因素,来确定请求并发数。下面是个简单例子,供您参考如何设定请求并发数及副本数。
说明
业务预期的最大峰值QPS为5000,单个请求的平均延时为5秒,部署在2C4G的机器或容器上。
关键的指标
5000*5 = 25000
副本数和并发数配置
如果需要应对业务峰值时的业务量,可部署更多副本(推荐)或提升服务器规格(不推荐):
业务QPS/服务器限制=5000/1000=5
。workerNum
(每个副本上设置并发数):服务器限制*请求平均延时=1000*5=5000
。Doubao-1.5-vision-pro-32k
进行批量推理。传入图片的方式支持传入图片的方式支持 TOS(火山引擎对象存储)链接,其他可访问链接以及 Base64 编码,传入方式同视觉理解 图片传入方式。... client = tos.TosClientV2(ak, sk, endpoint, region) # 生成上传文件的签名 url,设置 url 有效时间为7天 out = client.pre_signed_url(tos.HttpMethodType.Http_Method_Put, bucket=bucket_name, key=object_key, expires=604800) ...
如Go SDK中Client的
StartBatchWorker
方法会创建workNum
个goroutine,避免创建大量goroutine带来的资源开销及浪费。
相比多线程方式,异步协程的方式更优:
asyncio
可以立即切换到其他任务继续执行,避免了线程切换的开销,从而提高了整体的并发性能。说明
批量推理属于标准的高 io/低计算场景,如您系统未使用的同步 API 适配的场景,建议使用异步版,大部分场景用异步协程能获得更好的效果。
# 导入异步编程相关库,用于处理异步任务,异步编程中协程是核心概念 import asyncio # 导入系统相关库,可用于与 Python 解释器和系统进行交互,如标准错误输出 import sys # 导入操作系统相关库,用于访问操作系统的功能,如读取环境变量 import os # 导入日期时间处理库,用于记录和计算程序的执行时间 from datetime import datetime # 导入火山引擎异步 Ark 客户端库,用于与火山引擎的 Ark 服务进行异步通信 from volcenginesdkarkruntime import AsyncArk async def worker( # asyncio task 的协程唯一标识,用于区分不同的工作协程 worker_id: int, # 异步 Ark 客户端实例,用于调用批量聊天完成接口处理请求 client: AsyncArk, # 待处理请求的队列,存储需要处理的请求 requests: asyncio.Queue[dict], ): """ 异步协程函数,负责从队列中获取请求并处理。 :param worker_id: 协程的唯一标识,用于在日志中区分不同的协程 :param client: 异步 Ark 客户端实例,通过该实例调用服务接口处理请求 :param requests: 待处理请求的队列,存储待处理的请求信息 """ # 打印协程启动信息 print(f"Worker {worker_id} is starting.") while True: # 从队列中获取一个请求,若队列为空则会阻塞等待 # 这里的 await 关键字用于暂停协程的执行,等待队列中有元素可供获取 request = await requests.get() try: # 调用客户端的批量聊天完成接口处理请求,使用解包操作将请求字典作为参数传递 # 同样使用 await 关键字暂停协程,等待接口调用完成 completion = await client.batch_chat.completions.create(**request) # 打印处理结果 print(completion) except Exception as e: # 若处理请求过程中出现异常,将错误信息打印到标准错误输出 print(e, file=sys.stderr) finally: # 标记该请求已处理完成,通知队列该任务已结束 requests.task_done() async def main(): """ 主函数,负责初始化客户端、生成请求、启动协程并监控任务完成情况。 使用协程来实现并发处理请求,避免了使用线程带来的较大开销。 多个协程可以在一个线程中并发执行,提高程序的性能。 """ # 记录程序开始执行的时间 start = datetime.now() # 定义协程数量和任务数量 worker_num, task_num = 1000, 10000 # 创建一个异步队列用于存储请求,该队列支持异步操作 requests = asyncio.Queue() # 初始化异步 Ark 客户端 client = AsyncArk( # 从环境变量中获取 API 密钥,确保密钥的安全性 api_key=os.environ.get("ARK_API_KEY"), # 设置超时时间为24小时,建议超时时间设置尽量大些,推荐24小时~72小时,避免因网络等原因导致请求超时 timeout=24 * 3600, ) # 模拟 `task_num` 个任务,将请求信息添加到队列中 for _ in range(task_num): await requests.put( { # 替换为你的批量推理接入点 ID,指定要调用的服务端点 "model": "<YOUR_ENDPOINT_ID>", "messages": [ { # 系统角色消息,用于设置对话的初始信息 "role": "system", "content": "你是豆包,是由字节跳动开发的 AI 人工智能助手", }, { # 用户角色消息,包含用户的具体问题 "role": "user", "content": "常见的十字花科植物有哪些?" } ] } ) # 创建 `worker_num` 个 asyncio task 的协程,并启动它们,每个协程负责处理队列中的请求 # 这些协程会在一个线程中并发执行,通过协程的切换来提高效率 tasks = [ asyncio.create_task(worker(i, client, requests)) for i in range(worker_num) ] # 等待所有请求处理完成,即队列中的所有任务都被标记为已完成 await requests.join() # 停止所有协程,取消所有正在运行的任务 for task in tasks: task.cancel() # 等待所有协程取消完成,确保所有任务都已停止 await asyncio.gather(*tasks, return_exceptions=True) # 关闭客户端连接,释放资源 await client.close() # 记录程序结束执行的时间 end = datetime.now() # 打印程序总执行时间和处理的总任务数 print(f"Total time: {end - start}, Total task: {task_num}") if __name__ == "__main__": # 运行异步主函数,启动整个程序 # asyncio.run() 会创建一个事件循环,并在这个事件循环中运行主协程 asyncio.run(main())
通过控制台创建批量推理任务,请参见批量推理任务。
注意
doubao-1.5-vision-pro-32k
,传入图片的方式支持 TOS链接(推荐)、其他可访问图片链接、 Base64 编码,与视觉理解 图片传入方式相同。... client = tos.TosClientV2(ak, sk, endpoint, region) # 生成上传文件的签名 url,设置 url 有效时间为7天 out = client.pre_signed_url(tos.HttpMethodType.Http_Method_Put, bucket=bucket_name, key=object_key, expires=604800) ...
您可以通过快速入门示例代码实现批量推理任务的创建、查询、下载结果。
请您参考 数据文件格式说明,准备数据格式文件,文件为jsonl
格式,其中每行包含对 API 的单个请求的详细信息。
以下是包含 2 个请求的输入文件的示例。
{"custom_id": "request-1", "body": {"messages": [{"role": "user", "content": "天空为什么这么蓝?"}],"max_tokens": 1000,"top_p":1,"temperature":0.7}} {"custom_id": "request-2", "body": {"messages": [{"role": "system", "content": "You are an unhelpful assistant."},{"role":"user", "content":"天空为什么这么蓝?"}],"max_tokens":1000}}
说明:
custom_id
值,后续在结果文件中可以通过custom_id
来查询对应请求的结果。body
遵循 批量(Chat) API 请求体结构以及字段取值。doubao-1.5-vision-pro-250428
,传入图片的方式只支持 TOS(火山引擎对象存储)链接,不支持其他链接以及 Base64 编码。输入文件示例如下(多图和单图的输入文件示例)。您需要上传任务文件至 TOS 的bucket中,后续方舟平台会读取文件里的请求信息来进行批量推理。下面是上传文件的简单脚本。
如果您的任务文件较大,可以参考教程改造为分片上传。
运行脚本前,您需在配置您的桶信息等任务信息填入其中,将您的Access Key配置到环境变量
VOLC_ACCESSKEY
、VOLC_SECRETKEY
中(配置方法可参考文档)。
import os import tos # 从环境变量中读取您的Access Key,为了保障安全,建议您配置Access Key至环境变量 ak = os.environ.get("VOLC_ACCESSKEY") sk = os.environ.get("VOLC_SECRETKEY") # 存储任务文件及结果文件的桶的属性,包括ENDPOINT以及地域 endpoint = "tos-cn-beijing.volces.com" region = "cn-beijing" bucket_name = "input-bucket" # 上传任务文件的文件路径 object_key = "input/data.jsonl" # 任务文件本地的路径 file_name = "/usr/local/data.jsonl" # 初始化 TOS 客户端 client = tos.TosClientV2(ak, sk, endpoint, region) # 上传文件 client.put_object_from_file(bucket_name, object_key, file_name)
您可以调用接口CreateBatchInferenceJob - 创建批量推理任务来配置并创建批量推理任务,包括读入任务文件的路径、输出结果的路径、使用模型等等信息。
运行脚本前,您需在配置项类
Config
中填入您的任务信息,同时将您的Access Key配置到环境变量VOLC_ACCESSKEY
、VOLC_SECRETKEY
中(配置方法可参考文档)。
import volcenginesdkcore import volcenginesdkark import os # 配置项类,您需将其改为自己的配置 class Config: # 存储任务文件及结果文件的桶的属性,包括ENDPOINT以及地域 REGION = "cn-beijing" INPUT_BUCKET = "input-bucket" # 上传任务文件的文件路径 INPUT_OBJECT_KEY = "input/data.jsonl" OUTPUT_BUCKET = "output-bucket" # 存储结果文件的文件夹 OUTPUT_OBJECT_KEY = "output/" # 任务文件本地的路径 LOCAL_FILE_NAME = "/usr/local/data.jsonl" # 模型版本及名称,可在此查询 https://www.volcengine.com/docs/82379/1330310 MODEL_VERSION = "250115" MODEL_NAME = "doubao-1-5-pro-32k" # 命名您的批量推理任务 JOB_NAME = "demo" # 批量推理任务所在项目的名称,默认为default PROJECT_NAME = "default" # 从环境变量中读取您的Access Key,为了保障安全,建议您配置Access Key至环境变量 AK = os.environ.get("VOLC_ACCESSKEY") SK = os.environ.get("VOLC_SECRETKEY") # 创建批量推理任务 def create_batch_job(ark_instance): input_file_tos_location = ( volcenginesdkark.InputFileTosLocationForCreateBatchInferenceJobInput( bucket_name=Config.INPUT_BUCKET, object_key=Config.INPUT_OBJECT_KEY ) ) output_dir_tos_location = ( volcenginesdkark.OutputDirTosLocationForCreateBatchInferenceJobInput( bucket_name=Config.OUTPUT_BUCKET, object_key=Config.OUTPUT_OBJECT_KEY ) ) foundation_model = volcenginesdkark.FoundationModelForCreateBatchInferenceJobInput( model_version=Config.MODEL_VERSION, name=Config.MODEL_NAME ) model_reference = volcenginesdkark.ModelReferenceForCreateBatchInferenceJobInput( foundation_model=foundation_model ) req = volcenginesdkark.CreateBatchInferenceJobRequest( input_file_tos_location=input_file_tos_location, model_reference=model_reference, name=Config.JOB_NAME, output_dir_tos_location=output_dir_tos_location, project_name=Config.PROJECT_NAME, ) resp = ark_instance.create_batch_inference_job(req) return resp.id configuration = volcenginesdkcore.Configuration() configuration.ak = Config.AK configuration.sk = Config.SK configuration.region = Config.REGION configuration.client_side_validation = True volcenginesdkcore.Configuration.set_default(configuration) ark_instance = volcenginesdkark.ARKApi(volcenginesdkcore.ApiClient(configuration)) # 创建批量推理任务 batch_job_id = create_batch_job(ark_instance) print(f"创建批量推理任务,任务ID:{batch_job_id}")
请求将返回创建的批量推理任务的 ID。
创建批量推理任务,任务ID:bi-20250305220634-****
您可随时检查任务的状态,包括通过任务id、模型、运行状态等进行筛选,具体筛选方式可见ListBatchInferenceJobs - 获取批量推理任务列表。
import os import volcenginesdkcore import volcenginesdkark class Config: # 存储任务文件及结果文件的桶的属性,包括ENDPOINT以及地域 REGION = "cn-beijing" # 从环境变量中读取您的Access Key,为了保障安全,建议您配置Access Key至环境变量 AK = os.environ.get("VOLC_ACCESSKEY") SK = os.environ.get("VOLC_SECRETKEY") # 替换为您创建的批量推理任务的 ID BATCH_JOB_ID="bi-2025030****" # 列出指定 ID 的批量推理任务信息。 def list_batch_inference_jobs(ark_instance, batch_job_id): filter = volcenginesdkark.FilterForListBatchInferenceJobsInput(ids=[batch_job_id]) req = volcenginesdkark.ListBatchInferenceJobsRequest(filter=filter) resp = ark_instance.list_batch_inference_jobs(req) return resp configuration = volcenginesdkcore.Configuration() configuration.ak = Config.AK configuration.sk = Config.SK configuration.region = Config.REGION configuration.client_side_validation = True volcenginesdkcore.Configuration.set_default(configuration) ark_instance = volcenginesdkark.ARKApi(volcenginesdkcore.ApiClient(configuration)) batch_job_status = list_batch_inference_jobs(ark_instance, Config.BATCH_JOB_ID) print(f"批量推理任务信息:\n{batch_job_status}")
运行脚本,将返回匹配筛选条件的任务的信息。
批量推理任务信息: {'items': [{'completion_window': '28d', 'create_time': '2025-03-05T14:06:34Z', 'description': '', 'expire_time': '2025-04-02T14:06:34Z', 'id': 'bi-20250305220634-****', 'input_file_tos_location': {'bucket_name': 'input-bucket', 'object_key': 'input/data.jsonl'}, 'model_reference': {'custom_model_id': None, 'foundation_model': {'model_version': '250115', 'name': 'doubao-1-5-pro-32k'}}, 'name': 'demo', 'output_dir_tos_location': {'bucket_name': 'output-bucket', 'object_key': 'output/'}, 'project_name': 'default', 'request_counts': {'completed': 2, 'failed': 0, 'total': 2}, 'status': {'message': '', 'phase': 'Completed', 'phase_time': '2025-03-05T15:07:47Z'}, 'tags': [{'key': 'sys:ark:createdBy', 'value': '****'}], 'update_time': '2025-03-05T15:07:47Z'}], 'page_number': 1, 'page_size': 10, 'total_count': 1}
批量推理任务状态有以下几种:
状态 | 描述 |
---|---|
Queued | 任务提交成功,排队执行中 |
Running | 任务运行中 |
Completed | 任务已完成 |
Terminating | 任务在停止中。 |
Terminated | 任务已被停止。 |
Failed | 任务执行失败,原因可能是超时等原因。 |
查询到批量推理任务完成后,您可以通过接口下载执行批量推理任务后的文件,包括2个文件:
custom_id
来查询。import os import tos # 替换为您的批量推理任务 ID batch_job_id="bi-20250305220634-****" # 从环境变量中读取您的Access Key,为了保障安全,建议您配置Access Key至环境变量 ak = os.environ.get("VOLC_ACCESSKEY") sk = os.environ.get("VOLC_SECRETKEY") # 替换为您存储桶的属性,包括 endpoint、region、bucket_name endpoint = "tos-cn-beijing.volces.com" region = "cn-beijing" bucket_name = "output-bucket" # 替换为您配置的批量推理任务结果文件路径 output_object_key = "output/" # 替换为您存储批量推理任务结果文件的本地路径 local_output_dir = "./output" results_key = f"{output_object_key}{batch_job_id}/output/results.jsonl" errors_key = f"{output_object_key}{batch_job_id}/output/errors.jsonl" results_file_name = f"{local_output_dir}{batch_job_id}/results.jsonl" errors_file_name = f"{local_output_dir}{batch_job_id}/errors.jsonl" client = tos.TosClientV2(ak, sk, endpoint, region) try: client.get_object_to_file(bucket_name, results_key, results_file_name) except: print("results.jsonl not exist or other error") try: client.get_object_to_file(bucket_name, errors_key, errors_file_name) except: print("errors.jsonl not exist or other error")
您可以配置的local_output_dir
文件路径下,找到批量推理任务结果文件。
{"id":"02174118435223600000000000000000000ffffac1520039ca4e1","custom_id":"request-1","error":null,"response":{"request_id":"02174118435223600000000000000000000ffffac1520039ca4e1","status_code":200,"body":{"id":"02174118435223600000000000000000000ffffac1520039ca4e1","object":"chat.completion","created":1741184547,"model":"doubao-1-5-pro-32k-250115","choices":[{"index":0,"message":{"role":"assistant","content":"天空呈现蓝色主要是由于瑞利散射现象,以下为你详细解释:\n\n### 太阳光的组成\n太阳光其实是由多种不同颜色的光混合而成的,这些光按照波长从长到短依次为红、橙、黄、绿、蓝、靛、紫,也就是我们常说的七色光。\n\n### 大气的组成\n地球的大气层主要由氮气、氧气以及其他微量气体分子和微小颗粒组成。当太阳光进入大气层时,就会与这些气体分子发生相互作用。\n\n### 瑞利散射过程\n根据瑞利散射定律,散射光的强度与光的波长的四次方成反比。也就是说,波长越短的光,越容易被散射。\n\n在太阳光包含的各种颜色光中,蓝光的波长相对较短(约450 - 495纳米)。当太阳光进入地球大气层后,蓝光就很容易被大气中的气体分子散射到各个方向。所以,在我们向天空望去时,这些被散射到四面八方的蓝光就会进入我们的眼睛,从而让我们感觉整个天空都是蓝色的。\n\n### 影响天空蓝色深浅的因素\n在不同的时间和地点,大气中水汽、尘埃等颗粒物的含量有所不同,这会一定程度上改变大气对蓝光散射的效果。当空气中的颗粒物较多时,更多的蓝光被散射和吸收,天空可能呈现浅蓝色甚至有点发白;而在空气较为纯净的地区,比如海边、高原,散射效果更明显,天空看起来会更湛蓝。 "},"finish_reason":"stop"}],"usage":{"prompt_tokens":13,"completion_tokens":322,"total_tokens":335,"prompt_tokens_details":{"cached_tokens":0},"completion_tokens_details":{"reasoning_tokens":0}},"HttpHeader":{"X-Request-Id":["02174118435223600000000000000000000ffffac1520039ca4e1"],"X-Client-Request-Id":["2025030522191200000CA2C7FE1C9A9065"],"Vary":["Accept-Encoding"],"Server":["hertz"],"Date":["Wed, 05 Mar 2025 14:22:26 GMT"],"Content-Type":["application/json; charset=utf-8"]}}}} {"id":"02174118707871600000000000000000000ffffac152003cadb62","custom_id":"request-2","error":null,"response":{"request_id":"02174118707871600000000000000000000ffffac152003cadb62","status_code":200,"body":{"id":"02174118707871600000000000000000000ffffac152003cadb62","object":"chat.completion","created":1741187249,"model":"doubao-1-5-pro-32k-250115","choices":[{"index":0,"message":{"role":"assistant","content":"我不想给你好好解释这个问题。天空蓝不蓝关我啥事,你自己去想吧。 "},"finish_reason":"stop"}],"usage":{"prompt_tokens":25,"completion_tokens":24,"total_tokens":49,"prompt_tokens_details":{"cached_tokens":0},"completion_tokens_details":{"reasoning_tokens":0}},"HttpHeader":{"X-Client-Request-Id":["202503052304380000BACD2ADACB18921B"],"Vary":["Accept-Encoding"],"Server":["hertz"],"Date":["Wed, 05 Mar 2025 15:07:29 GMT"],"Content-Type":["application/json; charset=utf-8"],"X-Request-Id":["02174118707871600000000000000000000ffffac152003cadb62"]}}}}
如您业务处理任务较多,可按需申请限额提升。
您可参考文档批量推理任务最佳实践。