You need to enable JavaScript to run this app.
导航
upsert_data
最近更新时间:2025.01.10 19:13:06首次发布时间:2024.04.17 14:21:06

概述

upsert_data 用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。

请求参数

参数名

子参数

类型

是否必选

参数说明

Data

说明

Data 实例或者 Data 实例列表。

fields

array<map>

指定写入的数据。

  • 单次写入的数据数目不超过100。
  • 每条数据作为一个 map,key 为字段名,value 为字段值。
  • 数据写入时 fields 长度最大为65535,超过限制时会返回报错 “fields data is too long, should be less than 65535”。
  • 不同字段类型的字段值格式如下:
    • int64:格式是整型数值。
    • float:格式是浮点数值。
    • string:格式是字符串。
    • bool:格式是 true/false。
    • list<string>:格式是字符串数组。
    • list<int64>:格式是整型数组。
    • vector:格式是向量(浮点数数组)。
    • sparse_vector:格式是 json 字典,k 为 string 类型,表示关键词的字面量,v 为 float 类型,表示该关键词的权重数值。
    • text:格式是 map<string, string>,当前支持 text 。
      • text:以 string 形式写入文本原始数据, 如 {"text": "hello world"}。

ttl

int

数据过期时间,单位为秒。

  • 格式:0 和正整数。
  • 默认值:默认为0,表示数据不过期。
  • 当 ttl 设置为86400时,表示1天后数据自动删除。
  • 数据 ttl 删除,不会立刻更新到索引。

async_upsert

bool

是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。

  • True:表明异步请求写入。
  • 默认值:默认为False,表示正常请求。

示例

请求参数

同步写入数据示例:

# 获取指定数据集,程序初始化时调用即可,无需重复调用
collection = vikingdb_service.get_collection("example")
# 构建向量
def gen_random_vector(dim):
    res = [0, ] * dim
    for i in range(dim):
        res[i] = random.random() - 0.5
    return res
   
field1 = {"doc_id": "11", "text_vector": gen_random_vector(12), "text_sparse_vector": {"hello": 0.34, "world": 0.03, "!": 0.11}, "like": 1, "price": 1.11,
          "author": ["gy"], "aim": True}
field2 = {"doc_id": "22", "text_vector": gen_random_vector(12), "text_sparse_vector": {"hi": 0.12, "there": 0.043, "!": 0.5},"like": 2, "price": 2.22,
          "author": ["gy", "xjq"], "aim": False}
data1 = Data(field1)
data2 = Data(field2)
datas = []
datas.append(data1)
datas.append(data2)
collection.upsert_data(datas)  


# 异步调用
async def upsert_data():
    def gen_random_vector(dim):
        res = [0, ] * dim
        for i in range(dim):
            res[i] = random.random() - 0.5
        return res


    collection = await vikingdb_service.async_get_collection("async")
    field1 = {"doc_id": "111", "text_vector": gen_random_vector(10), "like": 1, "price": 1.11,
             "author": ["gy"], "aim": True}
    field2 = {"doc_id": "222", "text_vector": gen_random_vector(10), "like": 2, "price": 2.22,
             "author": ["gy", "xjq"], "aim": False}
    field3 = {"doc_id": "333", "text_vector": gen_random_vector(10), "like": 3, "price": 3.33,
             "author": ["gy", "xjq"], "aim": False}
    field4 = {"doc_id": "444", "text_vector": gen_random_vector(10), "like": 4, "price": 4.44,
             "author": ["gy", "xjq"], "aim": False}
    data1 = Data(field1)
    data2 = Data(field2)
    data3 = Data(field3)
    data4 = Data(field4)
    datas = [data1, data2, data3, data4]
    await collection.async_upsert_data(datas)
asyncio.run(upsert_data())

异步写入数据示例:

import multiprocessing
import struct, base64, uuid, tqdm, time
from volcengine.viking_db import *

queue = multiprocessing.Queue(maxsize=10)
event = multiprocessing.Event()
def consumer():
    """消费者函数:从队列中取出数据并处理"""
    vikingdb_service = VikingDBService()
    vikingdb_service.set_ak("ak")
    vikingdb_service.set_sk("sk")
    collection = vikingdb_service.get_collection("")
    items = []
    while not event.is_set() or not queue.empty():
        item = queue.get()
        items.append(item)
        if len(items) == 50:
            collection.upsert_data(items, async_upsert=True)
            items = []

    print("Consumer received event. Exiting...")

if __name__ == "__main__":
    # 创建消费者进程
    processors = []
    for i in range(10):
        p = multiprocessing.Process(target=consumer)
        p.start()
        processors.append(p)

    #  准备数据, 
    datas = []
    for i in range(100000):  
        # 压缩向量
        float_array = [0.124135132531424]*1024
        packed_data = struct.pack('f'*len(float_array), *float_array)
        s =  base64.b64encode(packed_data).decode()
        uuid4 = uuid.uuid4()  # 此处用户可修改为自己希望的id
        datas.append(Data({"id": str(uuid4), "text_vertor": s}))
    
    for data in tqdm.tqdm(datas):
        queue.put(data)
    # 通知消费者停止工作
    event.set()
    for p in processors:
        p.join()

    print("Main process exiting...")

返回值

Python 调用执行上面的任务,执行成功无返回信息。