upsert_data 用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。
说明
当前不支持更新部分字段,每次写入数据都要更新所有字段。写入数据时,如果 Collection 中已存在相同主键的数据,则会覆盖源数据;如果 Collection 中没有相同主键的数据,则会写入新数据。
参数名 | 子参数 | 类型 | 是否必选 | 参数说明 |
---|---|---|---|---|
Data 说明 Data 实例或者 Data 实例列表。 | fields | array<map> | 是 | 指定写入的数据。
|
ttl | int | 否 | 数据过期时间,单位为秒。
| |
async_upsert | bool | 否 | 是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。
|
同步写入数据示例:
# 获取指定数据集,程序初始化时调用即可,无需重复调用 collection = vikingdb_service.get_collection("example")
# 构建向量 def gen_random_vector(dim): res = [0, ] * dim for i in range(dim): res[i] = random.random() - 0.5 return res field1 = {"doc_id": "11", "text_vector": gen_random_vector(12), "text_sparse_vector": {"hello": 0.34, "world": 0.03, "!": 0.11}, "like": 1, "price": 1.11, "author": ["gy"], "aim": True} field2 = {"doc_id": "22", "text_vector": gen_random_vector(12), "text_sparse_vector": {"hi": 0.12, "there": 0.043, "!": 0.5},"like": 2, "price": 2.22, "author": ["gy", "xjq"], "aim": False} data1 = Data(field1) data2 = Data(field2) datas = [] datas.append(data1) datas.append(data2) collection.upsert_data(datas) # 异步调用 async def upsert_data(): def gen_random_vector(dim): res = [0, ] * dim for i in range(dim): res[i] = random.random() - 0.5 return res collection = await vikingdb_service.async_get_collection("async") field1 = {"doc_id": "111", "text_vector": gen_random_vector(10), "like": 1, "price": 1.11, "author": ["gy"], "aim": True} field2 = {"doc_id": "222", "text_vector": gen_random_vector(10), "like": 2, "price": 2.22, "author": ["gy", "xjq"], "aim": False} field3 = {"doc_id": "333", "text_vector": gen_random_vector(10), "like": 3, "price": 3.33, "author": ["gy", "xjq"], "aim": False} field4 = {"doc_id": "444", "text_vector": gen_random_vector(10), "like": 4, "price": 4.44, "author": ["gy", "xjq"], "aim": False} data1 = Data(field1) data2 = Data(field2) data3 = Data(field3) data4 = Data(field4) datas = [data1, data2, data3, data4] await collection.async_upsert_data(datas) asyncio.run(upsert_data())
异步写入数据示例:
import multiprocessing import struct, base64, uuid, tqdm, time from volcengine.viking_db import * queue = multiprocessing.Queue(maxsize=10) event = multiprocessing.Event() def consumer(): """消费者函数:从队列中取出数据并处理""" vikingdb_service = VikingDBService() vikingdb_service.set_ak("ak") vikingdb_service.set_sk("sk") collection = vikingdb_service.get_collection("") items = [] while not event.is_set() or not queue.empty(): item = queue.get() items.append(item) if len(items) == 50: collection.upsert_data(items, async_upsert=True) items = [] print("Consumer received event. Exiting...") if __name__ == "__main__": # 创建消费者进程 processors = [] for i in range(10): p = multiprocessing.Process(target=consumer) p.start() processors.append(p) # 准备数据, datas = [] for i in range(100000): # 压缩向量 float_array = [0.124135132531424]*1024 packed_data = struct.pack('f'*len(float_array), *float_array) s = base64.b64encode(packed_data).decode() uuid4 = uuid.uuid4() # 此处用户可修改为自己希望的id datas.append(Data({"id": str(uuid4), "text_vertor": s})) for data in tqdm.tqdm(datas): queue.put(data) # 通知消费者停止工作 event.set() for p in processors: p.join() print("Main process exiting...")
Python 调用执行上面的任务,执行成功无返回信息。