/collection/upsert_data 接口用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。
说明
当前不支持更新部分字段,每次写入数据都要更新所有字段。写入数据时,如果 Collection 中已存在相同主键的数据,则会覆盖源数据;如果 Collection 中没有相同主键的数据,则会写入新数据。
说明
请求向量数据库 VikingDB 的 OpenAPI 接口时,需要构造签名进行鉴权,详细的 OpenAPI 签名调用方法请参见 API签名调用指南。
URI | /api/collection/upsert_data | 统一资源标识符 |
---|---|---|
请求方法 | POST | 客户端对向量数据库服务器请求的操作类型 |
请求头 | Content-Type: application/json | 请求消息类型 |
Authorization: HMAC-SHA256 *** | 鉴权 |
参数名 | 类型 | 是否必选 | 参数说明 |
---|---|---|---|
collection_name/collection_alias | string | 是 | 指定写入数据的 Collection 名称/别名。
|
fields | array | 是 | 指定写入的数据。
|
ttl | int | 否 | 数据过期时间,单位为秒。
|
async | bool | 否 | 是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。
|
参数 | 参数说明 |
---|---|
code | 状态码 |
message | 返回信息 |
request_id | 标识每个请求的唯一标识符 |
状态码 | http状态码 | 返回信息 | 状态码说明 |
---|---|---|---|
0 | 200 | success | 在指定 Collection 中写入数据成功。 |
1000005 | 400 | collection not exist | Collection 不存在。 |
1000003 | 400 | invalid request:%s | 非法参数:
|
1000001 | 401 | unauthorized | 请求头中缺乏鉴权信息。 |
1000002 | 403 | no permission | 权限不足。 |
以同步写入向量数据为例:
curl -i -X POST \ -H 'Content-Type: application/json' \ -H 'Authorization: HMAC-SHA256 ***' \ https://api-vikingdb.volces.com/api/collection/upsert_data \ -d '{ "collection_name": "test_name", "ttl": 86400, // 表示数据一天后过期 "fields": [ { "id": 1000, "text": "hello world!", "vector_field": [0.10, 0.13.........0.52], "sparse_vector_field": {"hello": 0.34, "world": 0.03, "!": 0.11}, "time": 1690529701, "author": "laoshe" }, ... ] }'
以同步写入文本数据为例:
curl -i -X POST \ -H 'Content-Type: application/json' \ -H 'Authorization: HMAC-SHA256 ***' \ https://api-vikingdb.volces.com/api/collection/upsert_data \ -d '{ "collection_name": "test_raw_data_collection", "fields": [{ "id": 1001, "gap": 1.1, "content": "hello world" }] }'
异步写入文本数据示例:
import multiprocessing import struct, base64, uuid, tqdm, time from volcengine.auth.SignerV4 import SignerV4 from volcengine.Credentials import Credentials from volcengine.base.Request import Request import sys, requests, json, time, struct, base64 DOMAIN = '' AK = '' SK = '' def prepare_request(method, path, ak, sk, params=None, data=None, doseq=0, account_id=None): if params: for key in params: if type(params[key]) == int or type(params[key]) == float or type(params[key]) == bool: params[key] = str(params[key]) elif sys.version_info[0] != 3: if type(params[key]) == unicode: params[key] = params[key].encode('utf-8') elif type(params[key]) == list: if not doseq: params[key] = ','.join(params[key]) r = Request() r.set_shema("http") r.set_method(method) r.set_connection_timeout(10) r.set_socket_timeout(10) mheaders = { 'Accept': 'application/json', 'Content-Type': 'application/json', 'Host': DOMAIN, } if account_id is not None: mheaders["V-Account-Id"] = account_id r.set_headers(mheaders) if params: r.set_query(params) r.set_host(DOMAIN) r.set_path(path) if data is not None: r.set_body(json.dumps(data)) credentials = Credentials(ak, sk, 'air', 'cn-shanghai') SignerV4.sign(r, credentials) return r queue = multiprocessing.Queue(maxsize=10) event = multiprocessing.Event() def consumer(): """消费者函数:从队列中取出数据并处理""" items = [] while not event.is_set() or not queue.empty(): item = queue.get() items.append(item) if len(items) == 50: info_collection_req = prepare_request(method = "POST", path = "/api/collection/upsert_data", ak = AK, sk = SK, data = {"collection_name":"","fields":items,"async":True}) r = requests.request(method=info_collection_req.method, url="http://{}{}".format(DOMAIN, info_collection_req.path), headers=info_collection_req.headers, data=info_collection_req.body, timeout=10000 ) items = [] print("Consumer received event. Exiting...") if __name__ == "__main__": # 创建消费者进程 processors = [] for i in range(10): p = multiprocessing.Process(target=consumer) p.start() processors.append(p) # 准备数据, datas = [] for i in range(100000): # 压缩向量 float_array = [0.124135132531424]*1024 packed_data = struct.pack('f'*len(float_array), *float_array) s = base64.b64encode(packed_data).decode() uuid4 = uuid.uuid4() # 此处用户可修改为自己希望的id datas.append({"id": str(uuid4), "text_vertor": s}) for data in tqdm.tqdm(datas): queue.put(data) # 通知消费者停止工作 event.set() for p in processors: p.join() print("Main process exiting...")
执行成功返回:
HTTP/1.1 200 OK Content-Length: 43 Content-Type: application/json {"code":0,"msg":"success","request_id":"021695029757920fd001de6666600000000000000000002569b8f"}
执行失败返回:
HTTP/1.1 400 OK Content-Length: 43 Content-Type: application/json {"code":1000005, "msg":"collection not exist", "request_id":"021695029757920fd001de6666600000000000000000002569b8f"}