用于将数据上传至火山引擎服务器。数据预同步、历史数据同步、增量天级数据同步、增量实时数据同步等均会涉及到此接口。
每次请求数据量不超过10000条,qps建议不超过100,每秒上传的数据条数不超过50000条(请求qps*每次请求中数据条数)。
若既有增量天级数据,也有增量实时数据,必须先接入增量天级数据,再接入增量实时数据。
若仅有增量实时数据,上传后不可再上传增量天级数据。
数据上传接口的超时时间应尽量大,例如设置为5s。当数据上传接口调用失败的话,应重新上传数据。
增量实时数据上报时,建议聚合一批数据一起上报(比如积攒1000条再上报),减小客户端和服务端频繁交互的压力。
WriteData(dataList []map[string]interface{}, topic string, opts ...option.Option (*WriteResponse, error)
go
参数 | 类型 | 说明 |
---|---|---|
dataList | []map[string]interface{} | 上传的具体数据,不同行业同步字段请按照数据规范填写 |
topic | String | 数据上传时的topic,如用户数据对应“user”,商品数据对应“item”,行为数据对应“behavior” |
opts | Option[] | 请求中可选参数,不同场景需要带上不同opts参数,包括timeout、stage、DataDate、RequestId。其中DataDate只需要在离线数据上传时使用。具体使用方式见用例 |
使用自定义的WriteResponse类作为响应类型,具体参数如下表所示。在获取到WriteResponse类型的返回值后可调用它的GetStatus()方法判断此次数据上传是否成功。
参数 | 类型 | 字段含义 | 获取方法 |
---|---|---|---|
Status | int | 状态码 | GetStatus |
Errors | DataError | 出错的数据 | GetErrors |
import (
"github.com/google/uuid"
"github.com/volcengine/volcengine-sdk-go-rec/byteair"
"github.com/volcengine/volcengine-sdk-go-rec/core/logs"
"github.com/volcengine/volcengine-sdk-go-rec/core/option"
"time"
)
// 已经初始化好的client.示例省略init()
var client byteair.Client
func Write() {
// 此处为示例数据,实际调用时需注意字段类型和格式
dataList := []map[string]interface{}{
{ // 第一条数据
"id": "item_id1",
"title": "test_title1",
"status": 0,
"brand": "volcengine",
"pub_time": 1583641807,
"current_price": 1.1,
},
{ // 第二条数据
"id": "item_id2",
"title": "test_title2",
"status": 1,
"brand": "volcengine",
"pub_time": 1583641503,
"current_price": 2.2,
},
}
// topic为枚举值,请参考API文档
topic := "item"
// 同步离线天级数据,需要指定日期
date, _ := time.Parse("2006-01-02", "2022-01-01")
opts := []option.Option{
// 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"),增量实时同步("incremental_sync_streaming")
option.WithStage("pre_sync"),
// 必传,数据产生日期,实际传输时需修改为实际日期.增量实时同步可不传.
option.WithDataDate(date),
option.WithRequestId(uuid.NewString()),
}
rsp, err := client.WriteData(dataList, topic, opts...)
if err != nil {
logs.Error("[write] occur error, msg: %s", err.Error())
return
}
if !rsp.GetStatus().GetSuccess() {
logs.Error("[write] failure")
return
}
logs.Info("[write] success")
return
}
go