用于将数据上传至火山引擎服务器。数据预同步、历史数据同步、增量天级数据同步、增量实时数据同步等均会涉及到此接口。
每次请求数据量不超过10000条,qps建议不超过100,每秒上传的数据条数不超过50000条(请求qps*每次请求中数据条数)。
若既有增量天级数据,也有增量实时数据,必须先接入增量天级数据,再接入增量实时数据。
若仅有增量实时数据,上传后不可再上传增量天级数据。
数据上传接口的超时时间应尽量大,例如设置为5s。当数据上传接口调用失败的话,应重新上传数据。
增量实时数据上报时,建议聚合一批数据一起上报(比如积攒1000条再上报),减小客户端和服务端频繁交互的压力。
WriteData(dataList []map[string]interface{}, topic string, opts ...option.Option (*WriteResponse, error)
参数 | 类型 | 说明 |
---|---|---|
dataList | []map[string]interface{} | 上传的具体数据,不同行业同步字段请按照数据规范填写 |
topic | String | 数据上传时的topic,如用户数据对应“user”,商品数据对应“item”,行为数据对应“behavior” |
opts | Option[] | 请求中可选参数,不同场景需要带上不同opts参数,包括timeout、stage、DataDate、RequestId。其中DataDate只需要在离线数据上传时使用。具体使用方式见用例 |
使用自定义的WriteResponse类作为响应类型,具体参数如下表所示。在获取到WriteResponse类型的返回值后可调用它的GetStatus()方法判断此次数据上传是否成功。
参数 | 类型 | 字段含义 | 获取方法 |
---|---|---|---|
Status | int | 状态码 | GetStatus |
Errors | DataError | 出错的数据 | GetErrors |
import ( "github.com/google/uuid" "github.com/volcengine/volcengine-sdk-go-rec/byteair" "github.com/volcengine/volcengine-sdk-go-rec/core/logs" "github.com/volcengine/volcengine-sdk-go-rec/core/option" "time" ) // 已经初始化好的client.示例省略init() var client byteair.Client func Write() { // 此处为示例数据,实际调用时需注意字段类型和格式 dataList := []map[string]interface{}{ { // 第一条数据 "id": "item_id1", "title": "test_title1", "status": 0, "brand": "volcengine", "pub_time": 1583641807, "current_price": 1.1, }, { // 第二条数据 "id": "item_id2", "title": "test_title2", "status": 1, "brand": "volcengine", "pub_time": 1583641503, "current_price": 2.2, }, } // topic为枚举值,请参考API文档 topic := "item" // 同步离线天级数据,需要指定日期 date, _ := time.Parse("2006-01-02", "2022-01-01") opts := []option.Option{ // 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"),增量实时同步("incremental_sync_streaming") option.WithStage("pre_sync"), // 必传,数据产生日期,实际传输时需修改为实际日期.增量实时同步可不传. option.WithDataDate(date), option.WithRequestId(uuid.NewString()), } rsp, err := client.WriteData(dataList, topic, opts...) if err != nil { logs.Error("[write] occur error, msg: %s", err.Error()) return } if !rsp.GetStatus().GetSuccess() { logs.Error("[write] failure") return } logs.Info("[write] success") return }