You need to enable JavaScript to run this app.
导航
UpsertData
最近更新时间:2024.11.26 17:38:26首次发布时间:2024.04.17 14:21:08

概述

UpsertData 用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。

说明

当前不支持更新部分字段,每次写入数据都要更新所有字段。写入数据时,如果 Collection 中已存在相同主键的数据,则会覆盖源数据;如果 Collection 中没有相同主键的数据,则会写入新数据。

请求参数

参数名

子参数

类型

是否必选

参数说明

data

说明

Data 结构体或其构成的列表。

fields

array<map>

指定写入的数据。

  • 单次写入的数据数目不超过100。
  • 每条数据作为一个 map,key 为字段名,value 为字段值。
  • 数据写入时 fields 长度最大为65535,超过限制时会返回报错 “fields data is too long, should be less than 65535”。
  • 不同字段类型的字段值格式如下:
    • int64:格式是整型数值。
    • float:格式是浮点数值。
    • string:格式是字符串。
    • bool:格式是 true/false。
    • list<string>:格式是字符串数组。
    • list<int64>:格式是整型数组。
    • vector:格式是向量(浮点数数组)。
    • sparse_vector:格式是 json 字典,k 为 string 类型,表示关键词的字面量,v 为 float 类型,表示该关键词的权重数值。
    • text:格式是 map<string, string>,当前支持 text 。
      • text:以 string 形式写入文本原始数据, 如 {"text": "hello world"}。

TTL

int

数据过期时间,单位为秒。

  • 格式:0 和正整数。
  • 默认值:默认为0,表示数据不过期。
  • 当 ttl 设置为86400时,表示1天后数据自动删除。
  • 数据 ttl 删除,不会立刻更新到索引。

async_upsert

bool

是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。

  • True:表明异步请求写入。
  • 默认值:默认为False,表示正常请求。

示例

请求参数

//GetCollection获取指定数据集,程序初始化时调用即可,无需重复调用  
collection, _ := service.GetCollection("go")
//构建向量
field1 := map[string]interface{}{
    "doc_id":      "111",
    "text_vector": genRandomVector(12),
    "text_sparse_vector":map[string]float64{"hello": 0.34, "world": 0.03, "!": 0.11}
    "like":        1,
    "price":       1.11,
    "author":      []string{"gy"},
    "aim":         true,
}
field2 := map[string]interface{}{
    "doc_id":      "222",
    "text_vector": genRandomVector(12),
    "text_sparse_vector": map[string]float64{"hello": 0.34, "world": 0.03, "!": 0.11}
    "like":        2,
    "price":       2.22,
    "author":      []string{"gy", "xjq"},
    "aim":         false,
}
field3 := map[string]interface{}{
    "doc_id":      "333",
    "text_vector": genRandomVector(12),
    "text_sparse_vector": map[string]float64{"hello": 0.34, "world": 0.03, "!": 0.11}
    "like":        3,
    "price":       3.33,
    "author":      []string{"gy"},
    "aim":         true,
}
field4 := map[string]interface{}{
    "doc_id":      "444",
    "text_vector": genRandomVector(12),
    "text_sparse_vector": map[string]float64{"hello": 0.34, "world": 0.03, "!": 0.11}
    "like":        4,
    "price":       4.44,
    "author":      []string{"gy"},
    "aim":         true,
}
data1 := vikingdb.Data{
    Fields: field1,
    TTL:    100000,
}
data2 := vikingdb.Data{
    Fields: field2,
    TTL:    200000,
}
data3 := vikingdb.Data{
    Fields: field3,
    TTL:    100000,
}
data4 := vikingdb.Data{
    Fields: field4,
}
datas := []vikingdb.Data{
    data1,
    data2,
    data3,
    data4,
}
err := collection.UpsertData(datas)
if err != nil {
    print(err.Error())
}

异步写入数据示例:

var wg sync.WaitGroup

func consumer(queue <-chan vikingdb.Data, vikingDBService *vikingdb.VikingDBService, stopChan <-chan struct{}) {
    defer wg.Done()
    collection, err := vikingDBService.GetCollection("")
    if err != nil {
        fmt.Println(err)
    }
    items := make([]vikingdb.Data, 0, 50)

    for {
        select {
        case item := <-queue:
            items = append(items, item)
            if len(items) == 50 {
                collection.UpsertData(items, vikingdb.WithAsyncUpsert(true))
                items = items[:0]
            }
        case <-stopChan:
            if len(items) > 0 {
                collection.UpsertData(items, vikingdb.WithAsyncUpsert(true))
            }
            return
        }
    }
}

func main() {
    queue := make(chan vikingdb.Data, 10)
    stopChan := make(chan struct{})
    vikingDBService := vikingdb.NewVikingDBService()

    // 创建消费者
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go consumer(queue, vikingDBService, stopChan)
    }

    // 准备数据
    num := 10000
    floatArray := make([]float32, 1024)
    for j := range floatArray {
        floatArray[j] = 0.124135132531424
    }

    var datas []vikingdb.Data
    for i := 0; i < num; i++ {
        packedData := make([]byte, 1024*4)
        for j, v := range floatArray {
            binary.LittleEndian.PutUint32(packedData[j*4:], math.Float32bits(v))
        }
        s := base64.StdEncoding.EncodeToString(packedData)
        id := uuid.New().String()
        field := map[string]interface{}{
            "id":          id,
            "text_vector": s,
        }
        datas = append(datas, vikingdb.Data{Fields: field})
    }
    progressBar := pb.StartNew(num)

    for _, data := range datas {
        queue <- data
        progressBar.Increment()
    }
    close(stopChan)

    progressBar.Finish()

    // 等待所有消费者完成工作
    wg.Wait()
    fmt.Println("Main process exiting...")
}

返回值

Go 调用执行上面的任务,执行成功无返回信息。