使用断点续传上传的方式将文件上传到 TOS 时,您可以设置分片大小、上传分片的线程数、上传时客户端限速、事件回调函数等。上传过程中,如果出现网络异常或程序崩溃导致文件上传失败时,将从断点记录处继续上传未上传完成的部分。在上传的过程中可以通过调用传入的 CancelHook 中的 Cancel 方法取消对象上传。
tos:PutObject
权限,具体操作,请参见权限配置指南。Checkpoint
文件中,所以程序需要对 Checkpoint
文件有写权限。Checkpoint
文件中,如果上传过程中某一分片上传失败,再次上传时会 Checkpoint
文件中记录的点继续上传。上传完成后, Checkpoint
文件会被删除。package main import ( "context" "fmt" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" ) func checkErr(err error) { if err != nil { if serverErr, ok := err.(*tos.TosServerError); ok { fmt.Println("Error:", serverErr.Error()) fmt.Println("Request ID:", serverErr.RequestID) fmt.Println("Response Status Code:", serverErr.StatusCode) fmt.Println("Response Header:", serverErr.Header) fmt.Println("Response Err Code:", serverErr.Code) fmt.Println("Response Err Msg:", serverErr.Message) } else if clientErr, ok := err.(*tos.TosClientError); ok { fmt.Println("Error:", clientErr.Error()) fmt.Println("Client Cause Err:", clientErr.Cause.Error()) } else { fmt.Println("Error:", err) } panic(err) } } func main() { var ( accessKey = os.Getenv("TOS_ACCESS_KEY") secretKey = os.Getenv("TOS_SECRET_KEY") // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com endpoint = "https://tos-cn-beijing.volces.com" region = "cn-beijing" // 填写 BucketName bucketName = "*** Provide your bucket name ***" // 将文件上传到 example_dir 目录下的 example.txt 文件 objectKey = "example_dir/example.txt" ctx = context.Background() // 本地文件完整路径,例如usr/local/testfile.txt fileName = "/usr/local/testfile.txt" ) // 初始化客户端 client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey))) checkErr(err) // 直接使用文件路径上传文件 output, err := client.UploadFile(ctx, &tos.UploadFileInput{ CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{ Bucket: bucketName, Key: objectKey, }, // 上传的文件路径 FilePath: fileName, // 上传时指定分片大小 PartSize: tos.DefaultPartSize, // 分片上传任务并发数量 TaskNum: 5, // 开启断点续传 EnableCheckpoint: true, }) checkErr(err) fmt.Println("PutObjectV2 Request ID:", output.RequestID) }
package main import ( "context" "errors" "fmt" "io/ioutil" "os" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" ) var ( accessKey = os.Getenv("TOS_ACCESS_KEY") secretKey = os.Getenv("TOS_SECRET_KEY") // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com endpoint = "https://tos-cn-beijing.volces.com" region = "cn-beijing" // 填写 BucketName bucketName = "*** Provide your bucket name ***" // 本地文件夹路径 dirPath = "/usr/local/" ) func checkErr(err error) { if err != nil { if serverErr, ok := err.(*tos.TosServerError); ok { fmt.Println("Error:", serverErr.Error()) fmt.Println("Request ID:", serverErr.RequestID) fmt.Println("Response Status Code:", serverErr.StatusCode) fmt.Println("Response Header:", serverErr.Header) fmt.Println("Response Err Code:", serverErr.Code) fmt.Println("Response Err Msg:", serverErr.Message) } else if clientErr, ok := err.(*tos.TosClientError); ok { fmt.Println("Error:", clientErr.Error()) fmt.Println("Client Cause Err:", clientErr.Cause.Error()) } else { fmt.Println("Error:", err) } panic(err) } } var cli *tos.ClientV2 func uploadDir(ctx context.Context, dirPath string) error { file, err := os.Stat(dirPath) if err != nil { return err } if !file.IsDir() { return errors.New("please input file path. ") } files, err := ioutil.ReadDir(dirPath) for _, f := range files { if f.IsDir() { err = uploadDir(ctx, dirPath+f.Name()) if err != nil { return nil } continue } output, err := cli.UploadFile(ctx, &tos.UploadFileInput{ CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{ Bucket: bucketName, Key: dirPath + f.Name(), }, // 上传的文件路径 FilePath: dirPath + f.Name(), // 上传时指定分片大小 PartSize: tos.DefaultPartSize, // 分片上传任务并发数量 TaskNum: 5, // 开启断点续传 EnableCheckpoint: true, }) checkErr(err) fmt.Println("PutObjectV2 Request ID:", output.RequestID) } } func main() { // 初始化客户端 var err error cli, err = tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey))) checkErr(err) ctx := context.Background() err = uploadDir(ctx, dirPath) if err != nil { panic(err) } }
断点续传上传时可通过实现 tos.DataTransferStatusChange 接口接收上传进度,代码示例如下。
package main import ( "context" "fmt" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum" ) // 自定义进度回调,需要实现 tos.DataTransferStatusChange 接口 type listener struct { } func (l *listener) DataTransferStatusChange(event *tos.DataTransferStatus) { switch event.Type { case enum.DataTransferStarted: fmt.Println("Data transfer started") case enum.DataTransferRW: // Chunk 模式下 TotalBytes 值为 -1 if event.TotalBytes != -1 { fmt.Printf("Once Read:%d,ConsumerBytes/TotalBytes: %d/%d,%d%%\n", event.RWOnceBytes, event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes) } else { fmt.Printf("Once Read:%d,ConsumerBytes:%d\n", event.RWOnceBytes, event.ConsumedBytes) } case enum.DataTransferSucceed: fmt.Printf("Data Transfer Succeed, ConsumerBytes/TotalBytes: %d/%d,%d%%\n", event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes) case enum.DataTransferFailed: fmt.Printf("Data Transfer Failed\n") } } func checkErr(err error) { if err != nil { if serverErr, ok := err.(*tos.TosServerError); ok { fmt.Println("Error:", serverErr.Error()) fmt.Println("Request ID:", serverErr.RequestID) fmt.Println("Response Status Code:", serverErr.StatusCode) fmt.Println("Response Header:", serverErr.Header) fmt.Println("Response Err Code:", serverErr.Code) fmt.Println("Response Err Msg:", serverErr.Message) } else if clientErr, ok := err.(*tos.TosClientError); ok { fmt.Println("Error:", clientErr.Error()) fmt.Println("Client Cause Err:", clientErr.Cause.Error()) } else { fmt.Println("Error:", err) } panic(err) } } func main() { var ( accessKey = os.Getenv("TOS_ACCESS_KEY") secretKey = os.Getenv("TOS_SECRET_KEY") // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com endpoint = "https://tos-cn-beijing.volces.com" region = "cn-beijing" // 填写 BucketName bucketName = "*** Provide your bucket name ***" // 将文件上传到 example_dir 目录下的 example.txt 文件 objectKey = "example_dir/example.txt" ctx = context.Background() // 本地文件完整路径,例如usr/local/testfile.txt fileName = "/usr/local/testfile.txt" ) // 初始化客户端 client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey))) checkErr(err) // 直接使用文件路径上传文件 output, err := client.UploadFile(ctx, &tos.UploadFileInput{ CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{ Bucket: bucketName, Key: objectKey, }, // 上传的文件路径 FilePath: fileName, // 上传时指定分片大小 PartSize: tos.DefaultPartSize, // 分片上传任务并发数量 TaskNum: 5, // 开启断点续传 EnableCheckpoint: true, // 数据传输回调 DataTransferListener: &listener{}, }) checkErr(err) fmt.Println("PutObjectV2 Request ID:", output.RequestID) }
以下代码用于自定义断点续传上传回调函数。
package main import ( "context" "fmt" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum" ) // 自定义事件监听,需要实现 tos.UploadEventListener 接口 type eventChange struct { } func (e eventChange) EventChange(event *tos.UploadEvent) { switch event.Type { case enum.UploadEventCreateMultipartUploadSucceed: fmt.Printf("Upload to %s %s create multipart upload success, upload id:%s\n", event.Bucket, event.Key, event.UploadID) case enum.UploadEventCreateMultipartUploadFailed: fmt.Printf("Upload to %s %s create multipart upload fail, err:%v\n", event.Bucket, event.Key, event.Err) case enum.UploadEventUploadPartSucceed: fmt.Printf("Upload to %s %s part success, UploadPartInfo:%v\n", event.Bucket, event.Key, event.UploadPartInfo) case enum.UploadEventUploadPartAborted: fmt.Printf("Upload to %s %s part aborted, upload id:%s\n", event.Bucket, event.Key, event.UploadID) case enum.UploadEventUploadPartFailed: fmt.Printf("Upload to %s %s part fail, upload id:%s, err:%v\n", event.Bucket, event.Key, event.UploadID, event.Err) case enum.UploadEventCompleteMultipartUploadSucceed: fmt.Printf("Upload to %s %s success, upload id:%s\n", event.Bucket, event.Key, event.UploadID) case enum.UploadEventCompleteMultipartUploadFailed: fmt.Printf("Upload to %s %s fail, upload id:%s, err:%v\n", event.Bucket, event.Key, event.UploadID, event.Err) } } func checkErr(err error) { if err != nil { if serverErr, ok := err.(*tos.TosServerError); ok { fmt.Println("Error:", serverErr.Error()) fmt.Println("Request ID:", serverErr.RequestID) fmt.Println("Response Status Code:", serverErr.StatusCode) fmt.Println("Response Header:", serverErr.Header) fmt.Println("Response Err Code:", serverErr.Code) fmt.Println("Response Err Msg:", serverErr.Message) } else if clientErr, ok := err.(*tos.TosClientError); ok { fmt.Println("Error:", clientErr.Error()) fmt.Println("Client Cause Err:", clientErr.Cause.Error()) } else { fmt.Println("Error:", err) } panic(err) } } func main() { var ( accessKey = os.Getenv("TOS_ACCESS_KEY") secretKey = os.Getenv("TOS_SECRET_KEY") // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com endpoint = "https://tos-cn-beijing.volces.com" region = "cn-beijing" // 填写 BucketName bucketName = "*** Provide your bucket name ***" // 将文件上传到 example_dir 目录下的 example.txt 文件 objectKey = "example_dir/example.txt" ctx = context.Background() // 本地文件完整路径,例如usr/local/testfile.txt fileName = "/usr/local/testfile.txt" ) // 初始化客户端 client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey))) checkErr(err) // 直接使用文件路径上传文件 output, err := client.UploadFile(ctx, &tos.UploadFileInput{ CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{ Bucket: bucketName, Key: objectKey, }, // 上传的文件路径 FilePath: fileName, // 上传时指定分片大小 PartSize: tos.DefaultPartSize, // 分片上传任务并发数量 TaskNum: 5, // 开启断点续传 EnableCheckpoint: true, // 事件监听回调 UploadEventListener: eventChange{}, }) checkErr(err) fmt.Println("PutObjectV2 Request ID:", output.RequestID) }
断点续传上传时可以通过客户端使用 tos.RateLimiter 接口对所占用的带宽进行限制,代码如下所示。
package main import ( "context" "fmt" "sync" "time" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" ) func checkErr(err error) { if err != nil { if serverErr, ok := err.(*tos.TosServerError); ok { fmt.Println("Error:", serverErr.Error()) fmt.Println("Request ID:", serverErr.RequestID) fmt.Println("Response Status Code:", serverErr.StatusCode) fmt.Println("Response Header:", serverErr.Header) fmt.Println("Response Err Code:", serverErr.Code) fmt.Println("Response Err Msg:", serverErr.Message) } else if clientErr, ok := err.(*tos.TosClientError); ok { fmt.Println("Error:", clientErr.Error()) fmt.Println("Client Cause Err:", clientErr.Cause.Error()) } else { fmt.Println("Error:", err) } panic(err) } } type rateLimit struct { rate int64 capacity int64 currentAmount int64 sync.Mutex lastConsumeTime time.Time } func NewDefaultRateLimit(rate int64, capacity int64) tos.RateLimiter { return &rateLimit{ rate: rate, capacity: capacity, lastConsumeTime: time.Now(), currentAmount: capacity, Mutex: sync.Mutex{}, } } func (d *rateLimit) Acquire(want int64) (ok bool, timeToWait time.Duration) { d.Lock() defer d.Unlock() if want > d.capacity { want = d.capacity } increment := int64(time.Now().Sub(d.lastConsumeTime).Seconds() * float64(d.rate)) if increment+d.currentAmount > d.capacity { d.currentAmount = d.capacity } else { d.currentAmount += increment } if want > d.currentAmount { timeToWaitSec := float64(want-d.currentAmount) / float64(d.rate) return false, time.Duration(timeToWaitSec * float64(time.Second)) } d.lastConsumeTime = time.Now() d.currentAmount -= want return true, 0 } func main() { var ( accessKey = os.Getenv("TOS_ACCESS_KEY") secretKey = os.Getenv("TOS_SECRET_KEY") // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com endpoint = "https://tos-cn-beijing.volces.com" region = "cn-beijing" // 填写 BucketName bucketName = "*** Provide your bucket name ***" // 将文件上传到 example_dir 目录下的 example.txt 文件 objectKey = "example_dir/example.txt" ctx = context.Background() // 本地文件完整路径,例如usr/local/testfile.txt fileName = "/usr/local/testfile.txt" ) // 初始化客户端 client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey))) checkErr(err) // 直接使用文件路径上传文件 rateLimit1m := int64(1024 * 1024) output, err := client.UploadFile(ctx, &tos.UploadFileInput{ CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{ Bucket: bucketName, Key: objectKey, }, // 上传的文件路径 FilePath: fileName, // 上传时指定分片大小 PartSize: tos.DefaultPartSize, // 分片上传任务并发数量 TaskNum: 5, // 开启断点续传 EnableCheckpoint: true, // 上传客户端限速 RateLimiter: NewDefaultRateLimit(rateLimit1m, rateLimit1m), }) checkErr(err) fmt.Println("PutObjectV2 Request ID:", output.RequestID) }
以下代码用于在运行时取消正在执行的断点续传上传任务。
package main import ( "context" "fmt" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" ) func checkErr(err error) { if err != nil { if serverErr, ok := err.(*tos.TosServerError); ok { fmt.Println("Error:", serverErr.Error()) fmt.Println("Request ID:", serverErr.RequestID) fmt.Println("Response Status Code:", serverErr.StatusCode) fmt.Println("Response Header:", serverErr.Header) fmt.Println("Response Err Code:", serverErr.Code) fmt.Println("Response Err Msg:", serverErr.Message) } else if clientErr, ok := err.(*tos.TosClientError); ok { fmt.Println("Error:", clientErr.Error()) fmt.Println("Client Cause Err:", clientErr.Cause.Error()) } else { fmt.Println("Error:", err) } panic(err) } } func main() { var ( accessKey = os.Getenv("TOS_ACCESS_KEY") secretKey = os.Getenv("TOS_SECRET_KEY") // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com endpoint = "https://tos-cn-beijing.volces.com" region = "cn-beijing" // 填写 BucketName bucketName = "*** Provide your bucket name ***" // 将文件上传到 example_dir 目录下的 example.txt 文件 objectKey = "example_dir/example.txt" ctx = context.Background() // 本地文件完整路径,例如usr/local/testfile.txt fileName = "/usr/local/testfile.txt" ) // 初始化客户端 client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey))) checkErr(err) // 直接使用文件路径上传文件 cancel := tos.NewCancelHook() output, err := client.UploadFile(ctx, &tos.UploadFileInput{ CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{ Bucket: bucketName, Key: objectKey, }, // 上传的文件路径 FilePath: fileName, // 上传时指定分片大小 PartSize: tos.DefaultPartSize, // 分片上传任务并发数量 TaskNum: 5, // 开启断点续传 EnableCheckpoint: true, // 取消上传 CancelHook: cancel, }) checkErr(err) fmt.Println("PutObjectV2 Request ID:", output.RequestID) }