对于较大的对象,可以分成多个数据块(part)来分别上传,最后调用合并分片将上传的数据块合并为一个对象。
tos:PutObject
权限,具体操作,请参见权限配置指南。tos:AbortMultipartUpload
权限,具体操作,请参见权限配置指南。分片上传一般包含以下三个步骤。
说明
upload_id
标识),分片编号(part_number
)标识了该分片在整个对象中的相对位置。若通过同一分片编号上多次上传数据,TOS中会覆盖原始数据,并以最后一次上传数据为准。以下代码通过分片上传将本地文件上传到目标桶 bucket-test
中的 object-test
对象。
import os import tos from tos.utils import SizeAdapter # 从环境变量获取 AK 和 SK 信息。 ak = os.getenv('TOS_ACCESS_KEY') sk = os.getenv('TOS_SECRET_KEY') # your endpoint 和 your region 填写Bucket 所在区域对应的Endpoint。# 以华北2(北京)为例,your endpoint 填写 tos-cn-beijing.volces.com,your region 填写 cn-beijing。 endpoint = "your endpoint" region = "your region" bucket_name = "bucket-test" # 对象名称,例如 example_dir 下的 example_object.txt 文件,则填写为 example_dir/example_object.txt object_key = "object-test" # 本地文件完整路径,例如usr/local/testfile.txt file_name = '/usr/local/testfile.txt' total_size = os.path.getsize(file_name) part_size = 5 * 1024 * 1024 try: # 创建 TosClientV2 对象,对桶和对象的操作都通过 TosClientV2 实现 client = tos.TosClientV2(ak, sk, endpoint, region) # 初始化上传任务 # 若需在初始化分片时设置对象的存储类型,可通过storage_class字段设置 # 若需在初始化分片时设置对象ACL,可通过acl、grant_full_control等字段设置 multi_result = client.create_multipart_upload(bucket_name, object_key, acl=tos.ACLType.ACL_Public_Read, storage_class=tos.StorageClassType.Storage_Class_Standard) upload_id = multi_result.upload_id parts = [] # 上传分片数据 with open(file_name, 'rb') as f: part_number = 1 offset = 0 while offset < total_size: num_to_upload = min(part_size, total_size - offset) out = client.upload_part(bucket_name, object_key, upload_id, part_number, content=SizeAdapter(f, num_to_upload, init_offset=offset)) parts.append(out) offset += num_to_upload part_number += 1 # 完成分片上传任务 client.complete_multipart_upload(bucket_name, object_key, upload_id, parts) except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause)) except tos.exceptions.TosServerError as e: # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息 print('fail with server error, code: {}'.format(e.code)) # request id 可定位具体问题,强烈建议日志中保存 print('error with request id: {}'.format(e.request_id)) print('error with message: {}'.format(e.message)) print('error with http code: {}'.format(e.status_code)) print('error with ec: {}'.format(e.ec)) print('error with request url: {}'.format(e.request_url)) except Exception as e: print('fail with unknown error: {}'.format(e))
说明
对于字符串、Bytes 和本地文件四种形式的数据,支持进度条功能。网络流等类型数据无法获取上传内容的大小,因此回调时不会返回对象总体大小。
以下代码用于向分片上传过程中添加进度条功能。
import os import tos from tos import DataTransferType from tos.utils import SizeAdapter, MergeProcess # 从环境变量获取 AK 和 SK 信息。 ak = os.getenv('TOS_ACCESS_KEY') sk = os.getenv('TOS_SECRET_KEY') # your endpoint 和 your region 填写Bucket 所在区域对应的Endpoint。# 以华北2(北京)为例,your endpoint 填写 tos-cn-beijing.volces.com,your region 填写 cn-beijing。 endpoint = "your endpoint" region = "your region" bucket_name = "bucket-test" # 对象名称,例如 example_dir 下的 example_object.txt 文件,则填写为 example_dir/example_object.txt object_key = "object-test" # 本地文件完整路径,例如usr/local/testfile.txt file_name = '/usr/local/testfile.txt' total_size = os.path.getsize(file_name) part_size = 5 * 1024 * 1024 try: # 创建 TosClientV2 对象,对桶和对象的操作都通过 TosClientV2 实现 client = tos.TosClientV2(ak, sk, endpoint, region) def percentage(consumed_bytes: int, total_bytes: int, rw_once_bytes: int, type: DataTransferType): if total_bytes: rate = int(100 * float(consumed_bytes) / float(total_bytes)) print("rate:{}, consumed_bytes:{},total_bytes{}, rw_once_bytes:{}, type:{}".format(rate, consumed_bytes, total_bytes, rw_once_bytes, type)) # 配置进度条,与普通上传不同的是需将分片上传的进度聚合 data_transfer_listener = MergeProcess(percentage, total_size, (total_size + part_size - 1) // part_size, 0) # 初始化上传任务 # 若需在初始化分片时设置对象的存储类型,可通过storage_class字段设置 # 若需在初始化分片时设置对象ACL,可通过acl、grant_full_control等字段设置 multi_result = client.create_multipart_upload(bucket_name, object_key, acl=tos.ACLType.ACL_Public_Read, storage_class=tos.StorageClassType.Storage_Class_Standard) upload_id = multi_result.upload_id parts = [] # 上传分片数据 with open(file_name, 'rb') as f: part_number = 1 offset = 0 while offset < total_size: num_to_upload = min(part_size, total_size - offset) out = client.upload_part(bucket_name, object_key, upload_id, part_number, content=SizeAdapter(f, num_to_upload, init_offset=offset), data_transfer_listener=data_transfer_listener) parts.append(out) offset += num_to_upload part_number += 1 # 完成分片上传任务 client.complete_multipart_upload(bucket_name, object_key, upload_id, parts) except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause)) except tos.exceptions.TosServerError as e: # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息 print('fail with server error, code: {}'.format(e.code)) # request id 可定位具体问题,强烈建议日志中保存 print('error with request id: {}'.format(e.request_id)) print('error with message: {}'.format(e.message)) print('error with http code: {}'.format(e.status_code)) print('error with ec: {}'.format(e.ec)) print('error with request url: {}'.format(e.request_url)) except Exception as e: print('fail with unknown error: {}'.format(e))
您可以通过 abort_multipart_upload 方法来取消分片上传任务。当一个分片任务被取消后, TOS 会将已上传的分片数据删除,同时您无法再对此分片任务进行任何操作。
以下代码用于取消桶 bucket-test
中对象 object-test
的分片上传任务。
import os import tos # 从环境变量获取 AK 和 SK 信息。 ak = os.getenv('TOS_ACCESS_KEY') sk = os.getenv('TOS_SECRET_KEY') # your endpoint 和 your region 填写Bucket 所在区域对应的Endpoint。# 以华北2(北京)为例,your endpoint 填写 tos-cn-beijing.volces.com,your region 填写 cn-beijing。 endpoint = "your endpoint" region = "your region" bucket_name = "bucket-test" # 对象名称,例如 example_dir 下的 example_object.txt 文件,则填写为 example_dir/example_object.txt object_key = "object-test" # 由create_multipart_upload接口返回的upload_id upload_id = "your upload id" try: client = tos.TosClientV2(ak, sk, endpoint, region) # 取消分片指定upload_id的分片上传事件,已上传的分片会被删除。 client.abort_multipart_upload(bucket_name, object_key, upload_id) except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause)) except tos.exceptions.TosServerError as e: # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息 print('fail with server error, code: {}'.format(e.code)) # request id 可定位具体问题,强烈建议日志中保存 print('error with request id: {}'.format(e.request_id)) print('error with message: {}'.format(e.message)) print('error with http code: {}'.format(e.status_code)) print('error with ec: {}'.format(e.ec)) print('error with request url: {}'.format(e.request_url)) except Exception as e: print('fail with unknown error: {}'.format(e))
以下代码用于列举桶桶 bucket-test
中对象 object-test
已上传的分片信息。
import os import tos # 从环境变量获取 AK 和 SK 信息。 ak = os.getenv('TOS_ACCESS_KEY') sk = os.getenv('TOS_SECRET_KEY') # your endpoint 和 your region 填写Bucket 所在区域对应的Endpoint。# 以华北2(北京)为例,your endpoint 填写 tos-cn-beijing.volces.com,your region 填写 cn-beijing。 endpoint = "your endpoint" region = "your region" bucket_name = "bucket-test" # 对象名称,例如 example_dir 下的 example_object.txt 文件,则填写为 example_dir/example_object.txt object_key = "object-test" # 由create_multipart_upload接口返回的upload_id upload_id = "upload_id" try: # 创建 TosClientV2 对象,对桶和对象的操作都通过 TosClientV2 实现 client = tos.TosClientV2(ak, sk, endpoint, region) # 列举指定upload_id对应已上传的分片信息。 truncated = True marker = 0 while truncated: # part_number_marker 指定分片上传的起始位 # is_truncated 为 True 时,表明还存在未列举完成的分片 out = client.list_parts(bucket_name, object_key, upload_id, part_number_marker=marker) for part in out.parts: print('part_number:', part.part_number) print('etag:', part.etag) print('size', part.size) marker = out.next_part_number_marker truncated = out.is_truncated except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause)) except tos.exceptions.TosServerError as e: # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息 print('fail with server error, code: {}'.format(e.code)) # request id 可定位具体问题,强烈建议日志中保存 print('error with request id: {}'.format(e.request_id)) print('error with message: {}'.format(e.message)) print('error with http code: {}'.format(e.status_code)) print('error with ec: {}'.format(e.ec)) print('error with request url: {}'.format(e.request_url)) except Exception as e: print('fail with unknown error: {}'.format(e))