UpsertData 用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。
说明
当前不支持更新部分字段,每次写入数据都要更新所有字段。写入数据时,如果 Collection 中已存在相同主键的数据,则会覆盖源数据;如果 Collection 中没有相同主键的数据,则会写入新数据。
参数名 | 子参数 | 类型 | 是否必选 | 参数说明 |
---|---|---|---|---|
data 说明 Data 结构体或其构成的列表。 | fields | array<map> | 是 | 指定写入的数据。
|
TTL | int | 否 | 数据过期时间,单位为秒。
| |
async_upsert | bool | 否 | 是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。
|
//GetCollection获取指定数据集,程序初始化时调用即可,无需重复调用 collection, _ := service.GetCollection("go")
//构建向量 field1 := map[string]interface{}{ "doc_id": "111", "text_vector": genRandomVector(12), "text_sparse_vector":map[string]float64{"hello": 0.34, "world": 0.03, "!": 0.11} "like": 1, "price": 1.11, "author": []string{"gy"}, "aim": true, } field2 := map[string]interface{}{ "doc_id": "222", "text_vector": genRandomVector(12), "text_sparse_vector": map[string]float64{"hello": 0.34, "world": 0.03, "!": 0.11} "like": 2, "price": 2.22, "author": []string{"gy", "xjq"}, "aim": false, } field3 := map[string]interface{}{ "doc_id": "333", "text_vector": genRandomVector(12), "text_sparse_vector": map[string]float64{"hello": 0.34, "world": 0.03, "!": 0.11} "like": 3, "price": 3.33, "author": []string{"gy"}, "aim": true, } field4 := map[string]interface{}{ "doc_id": "444", "text_vector": genRandomVector(12), "text_sparse_vector": map[string]float64{"hello": 0.34, "world": 0.03, "!": 0.11} "like": 4, "price": 4.44, "author": []string{"gy"}, "aim": true, } data1 := vikingdb.Data{ Fields: field1, TTL: 100000, } data2 := vikingdb.Data{ Fields: field2, TTL: 200000, } data3 := vikingdb.Data{ Fields: field3, TTL: 100000, } data4 := vikingdb.Data{ Fields: field4, } datas := []vikingdb.Data{ data1, data2, data3, data4, } err := collection.UpsertData(datas) if err != nil { print(err.Error()) }
异步写入数据示例:
var wg sync.WaitGroup func consumer(queue <-chan vikingdb.Data, vikingDBService *vikingdb.VikingDBService, stopChan <-chan struct{}) { defer wg.Done() collection, err := vikingDBService.GetCollection("") if err != nil { fmt.Println(err) } items := make([]vikingdb.Data, 0, 50) for { select { case item := <-queue: items = append(items, item) if len(items) == 50 { collection.UpsertData(items, vikingdb.WithAsyncUpsert(true)) items = items[:0] } case <-stopChan: if len(items) > 0 { collection.UpsertData(items, vikingdb.WithAsyncUpsert(true)) } return } } } func main() { queue := make(chan vikingdb.Data, 10) stopChan := make(chan struct{}) vikingDBService := vikingdb.NewVikingDBService() // 创建消费者 for i := 0; i < 10; i++ { wg.Add(1) go consumer(queue, vikingDBService, stopChan) } // 准备数据 num := 10000 floatArray := make([]float32, 1024) for j := range floatArray { floatArray[j] = 0.124135132531424 } var datas []vikingdb.Data for i := 0; i < num; i++ { packedData := make([]byte, 1024*4) for j, v := range floatArray { binary.LittleEndian.PutUint32(packedData[j*4:], math.Float32bits(v)) } s := base64.StdEncoding.EncodeToString(packedData) id := uuid.New().String() field := map[string]interface{}{ "id": id, "text_vector": s, } datas = append(datas, vikingdb.Data{Fields: field}) } progressBar := pb.StartNew(num) for _, data := range datas { queue <- data progressBar.Increment() } close(stopChan) progressBar.Finish() // 等待所有消费者完成工作 wg.Wait() fmt.Println("Main process exiting...") }
Go 调用执行上面的任务,执行成功无返回信息。