You need to enable JavaScript to run this app.
导航
upsert_data
最近更新时间:2024.11.07 12:01:53首次发布时间:2024.04.17 14:21:06

概述

/collection/upsert_data 接口用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。

说明

当前不支持更新部分字段,每次写入数据都要更新所有字段。写入数据时,如果 Collection 中已存在相同主键的数据,则会覆盖源数据;如果 Collection 中没有相同主键的数据,则会写入新数据。

请求接口

说明

请求向量数据库 VikingDB 的 OpenAPI 接口时,需要构造签名进行鉴权,详细的 OpenAPI 签名调用方法请参见 API签名调用指南

URI

/api/collection/upsert_data

统一资源标识符

请求方法

POST

客户端对向量数据库服务器请求的操作类型

请求头

Content-Type: application/json

请求消息类型

Authorization: HMAC-SHA256 ***

鉴权

请求参数

参数名

类型

是否必选

参数说明

collection_name/collection_alias

string

指定写入数据的 Collection 名称/别名。

  • 只能使用英文字母、数字、下划线_,并以英文字母开头,不能为空。
  • 长度要求:[1, 128]。
  • Collection 名称/别名不能重复。

fields

array

指定写入的数据。

  • 单次写入的数据数目不超过100。
  • 每条数据作为一个 map,key 为字段名,value 为字段值。
  • 数据写入时 fields 长度最大为65535,超过限制时会返回报错 “fields data is too long, should be less than 65535”。
  • 不同字段类型的字段值格式如下:
    • int64:格式是整型数值。
    • float:格式是浮点数值。
    • string:格式是字符串。
    • bool:格式是 true/false。
    • list<string>:格式是字符串数组。
    • list<int64>:格式是整型数组。
    • vector:格式是向量(浮点数数组)。
    • sparse_vector:格式是 json 字典,k 为 string 类型,表示关键词的字面量,v 为 float 类型,表示该关键词的权重数值。
    • text:格式是 string。

ttl

int

数据过期时间,单位为秒。

  • 格式:0 和正整数。
  • 默认值:默认为0,表示数据不过期。
  • 当 ttl 设置为86400时,表示1天后数据自动删除。
  • 数据 ttl 删除,不会立刻更新到索引。

async

bool

是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。

  • True:表明异步请求写入。
  • 默认值:默认为False,表示正常请求。

响应消息

参数

参数说明

code

状态码

message

返回信息

request_id

标识每个请求的唯一标识符

状态码说明

状态码

http状态码

返回信息

状态码说明

0

200

success

在指定 Collection 中写入数据成功。

1000005

400

collection not exist

Collection 不存在。

1000003

400

invalid request:%s

非法参数:

  • 缺失必选参数。
  • 字段值与字段类型不匹配。

1000001

401

unauthorized

请求头中缺乏鉴权信息。

1000002

403

no permission

权限不足。

完整示例

请求消息

以同步写入向量数据为例:

curl -i -X POST \
  -H 'Content-Type: application/json' \
  -H 'Authorization: HMAC-SHA256 ***' \
  https://api-vikingdb.volces.com/api/collection/upsert_data \
  -d '{
    "collection_name": "test_name",
    "ttl": 86400, // 表示数据一天后过期
    "fields": [
        {
            "id": 1000,
            "text": "hello world!",
            "vector_field": [0.10, 0.13.........0.52],
            "sparse_vector_field": {"hello": 0.34, "world": 0.03, "!": 0.11},
            "time": 1690529701,
            "author": "laoshe"
        },
        
        ...                              
    ]
}'

以同步写入文本数据为例:

curl -i -X POST \
  -H 'Content-Type: application/json' \
  -H 'Authorization: HMAC-SHA256 ***' \
  https://api-vikingdb.volces.com/api/collection/upsert_data \
  -d '{
        "collection_name": "test_raw_data_collection",
        "fields": [{
                "id": 1001,  
                "gap": 1.1,
                "content": "hello world"
        }]
}'

异步写入文本数据示例:

import multiprocessing
import struct, base64, uuid, tqdm, time
from volcengine.auth.SignerV4 import SignerV4
from volcengine.Credentials import Credentials
from volcengine.base.Request import Request
import sys, requests, json, time, struct, base64

DOMAIN = ''
AK = ''
SK = ''
def prepare_request(method, path, ak, sk, params=None, data=None, doseq=0, account_id=None):
    if params:
        for key in params:
            if type(params[key]) == int or type(params[key]) == float or type(params[key]) == bool:
                params[key] = str(params[key])
            elif sys.version_info[0] != 3:
                if type(params[key]) == unicode:
                    params[key] = params[key].encode('utf-8')
            elif type(params[key]) == list:
                if not doseq:
                    params[key] = ','.join(params[key])
    r = Request()
    r.set_shema("http")
    r.set_method(method)
    r.set_connection_timeout(10)
    r.set_socket_timeout(10)
    mheaders = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'Host': DOMAIN,
    }
    if account_id is not None:
        mheaders["V-Account-Id"] = account_id
    r.set_headers(mheaders)
    if params:
        r.set_query(params)
    r.set_host(DOMAIN)
    r.set_path(path)
    if data is not None:
        r.set_body(json.dumps(data))
    credentials = Credentials(ak, sk, 'air', 'cn-shanghai')
    SignerV4.sign(r, credentials)
    return r

queue = multiprocessing.Queue(maxsize=10)
event = multiprocessing.Event()
def consumer():
    """消费者函数:从队列中取出数据并处理"""
    
    items = []
    while not event.is_set() or not queue.empty():
        item = queue.get()
        items.append(item)
        if len(items) == 50:
            info_collection_req = prepare_request(method = "POST", 
                    path = "/api/collection/upsert_data",
                    ak = AK, 
                    sk = SK,
                    data = {"collection_name":"","fields":items,"async":True})
            r = requests.request(method=info_collection_req.method,
                url="http://{}{}".format(DOMAIN, info_collection_req.path),
                headers=info_collection_req.headers,
                data=info_collection_req.body,
                timeout=10000
            )
            items = []

    print("Consumer received event. Exiting...")

if __name__ == "__main__":
    # 创建消费者进程
    processors = []
    for i in range(10):
        p = multiprocessing.Process(target=consumer)
        p.start()
        processors.append(p)

    #  准备数据, 
    datas = []
    for i in range(100000):  
        # 压缩向量
        float_array = [0.124135132531424]*1024
        packed_data = struct.pack('f'*len(float_array), *float_array)
        s =  base64.b64encode(packed_data).decode()
        uuid4 = uuid.uuid4()  # 此处用户可修改为自己希望的id
        datas.append({"id": str(uuid4), "text_vertor": s})
        
    
    for data in tqdm.tqdm(datas):
        queue.put(data)

    # 通知消费者停止工作
    event.set()
    for p in processors:
        p.join()

    print("Main process exiting...")

响应消息

执行成功返回:

HTTP/1.1 200 OK
Content-Length: 43
Content-Type: application/json
 
{"code":0,"msg":"success","request_id":"021695029757920fd001de6666600000000000000000002569b8f"}

执行失败返回:

HTTP/1.1 400 OK
Content-Length: 43
Content-Type: application/json
 
{"code":1000005, "msg":"collection not exist", "request_id":"021695029757920fd001de6666600000000000000000002569b8f"}