You need to enable JavaScript to run this app.
导航
upsertData
最近更新时间:2024.11.13 15:35:35首次发布时间:2024.04.17 14:21:07

概述

upsertData 用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。

说明

当前不支持更新部分字段,每次写入数据都要更新所有字段。写入数据时,如果 Collection 中已存在相同主键的数据,则会覆盖源数据;如果 Collection 中没有相同主键的数据,则会写入新数据。

请求参数

参数名

子参数

类型

是否必选

参数说明

DataObject

说明

DataObject实例或者实例列表。

fields

array<map>

指定写入的数据。

  • 单次写入的数据数目不超过100。
  • 每条数据作为一个 map,key 为字段名,value 为字段值。
  • 数据写入时 fields 长度最大为65535,超过限制时会返回报错 “fields data is too long, should be less than 65535”。
  • 不同字段类型的字段值格式如下:
    • int64:格式是整型数值。
    • float:格式是浮点数值。
    • string:格式是字符串。
    • bool:格式是 true/false。
    • list<string>:格式是字符串数组。
    • list<int64>:格式是整型数组。
    • vector:格式是向量(浮点数数组)。
    • sparse_vector:格式是 json 字典,k 为 string 类型,表示关键词的字面量,v 为 float 类型,表示该关键词的权重数值。
    • text:格式是 map<string, string>,当前支持 text 。
      • text:以 string 形式写入文本原始数据, 如 {"text": "hello world"}。

TTL

int

数据过期时间,单位为秒。

  • 格式:0 和正整数。
  • 默认值:默认为0,表示数据不过期。
  • 当 ttl 设置为86400时,表示1天后数据自动删除。
  • 数据 ttl 删除,不会立刻更新到索引。

async_upsert

bool

是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。

  • True:表明异步请求写入。
  • 默认值:默认为False,表示正常请求。

示例

请求参数

//getCollection获取指定数据集,程序初始化时调用即可,无需重复调用                  
Collection collection = vikingDBService.getCollection("javaSDKTest");
//构建向量
HashMap<String,Object> field1 = new HashMap<String,Object>();
List<String> author1 = new ArrayList<>();
author1.add("name1");
author1.add("name2");
field1.put("doc_id", "11");
field1.put("text_vector", genRandomVector(12));
field1.put("text_sparse_vector", {"hello": 0.34, "world": 0.03, "!": 0.11});
field1.put("like", 1);
field1.put("price", 1.11);
field1.put("aim", true);
field1.put("author", author1);
DataObject dataObject1 = new DataObject()
                        .setFields(field1)
                        .setTTL(200000)
                        .build();
HashMap<String,Object> field2 = new HashMap<String,Object>();
List<String> author2 = new ArrayList<>();
author2.add("name3");
author2.add("name4");
field2.put("doc_id", "22");
field2.put("text_vector", genRandomVector(12));
field2.put("text_sparse_vector", {"hi": 0.12, "there": 0.043, "!": 0.5});
field2.put("like", 2);
field2.put("price", 1.11);
field2.put("aim", false);
field2.put("author", author2);
DataObject dataObject2 = new DataObject()
                        .setFields(field2)
                        .build();
HashMap<String,Object> field3 = new HashMap<String,Object>();
List<String> author3 = new ArrayList<>();
author3.add("name5");
author3.add("name6");
field3.put("doc_id", "33");
field3.put("text_vector", genRandomVector(12));
field3.put("text_sparse_vector", {"hi": 0.12, "there": 0.043, "!": 0.5});
field3.put("like", 3);
field3.put("price", 3.33);
field3.put("aim", false);
field3.put("author", author3);
DataObject dataObject3 = new DataObject()
                        .setFields(field3)
                        .setTTL(6000000)
                        .build();
HashMap<String,Object> field4 = new HashMap<String,Object>();
List<String> author4 = new ArrayList<>();
author4.add("name7");
author4.add("name8");
field4.put("doc_id", "44");
field4.put("text_vector", genRandomVector(12));
field4.put("text_sparse_vector", {"hi": 0.12, "there": 0.043, "!": 0.5});
field4.put("like", 4);
field4.put("price", 4.44);
field4.put("aim", false);
field4.put("author", author4);
DataObject dataObject4 = new DataObject()
                        .setFields(field4)
                        .build();

List<DataObject> dataObjects = new ArrayList<>();
dataObjects.add(dataObject1);
dataObjects.add(dataObject2);
dataObjects.add(dataObject3);
dataObjects.add(dataObject4);
collection.upsertData(dataObjects);

异步写入数据示例:

class Consumer implements Runnable {
    private final LinkedBlockingQueue<DataObject> queue;
    private final AtomicBoolean event;
    
    public Consumer(LinkedBlockingQueue<DataObject> queue, AtomicBoolean event) {
        this.queue = queue;
        this.event = event;
    }

    @Override
    public void run() {
        VikingDBService vikingdbService = null;
        try {
            vikingdbService = new VikingDBService();
        } catch (Exception e) {
            e.printStackTrace();
        }
        Collection collection = null;
        try {
            collection = vikingdbService.getCollection("");
        } catch (Exception e) {
            e.printStackTrace();
        }
        List<DataObject> items = new ArrayList<>();
        
        while (!event.get() || !queue.isEmpty()) {
            try {
                DataObject item = queue.poll(1, TimeUnit.SECONDS);
                if (item != null) {
                    items.add(item);
                    if (items.size() == 50) {
                        try {
                            collection.upsertData(items, true);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        items.clear();
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        System.out.println("Consumer received event. Exiting...");
    }
}

public class test {
    public static List<Double> genRandomVector(int dim){
        List<Double> res = new ArrayList<>();
        for(int i=0;i<dim;i++){
            res.add(new Random().nextDouble());
        }
        return res;
    }
    
    public static void main(String[] args) throws Exception {
        VikingDBService vikingDBService = new VikingDBService();
        LinkedBlockingQueue<DataObject> queue = new LinkedBlockingQueue<>(10);
        AtomicBoolean event = new AtomicBoolean(false);
        List<Thread> processors = new ArrayList<>();
        
        for (int i = 0; i < 10; i++) {
            Thread p = new Thread(new Consumer(queue, event));
            p.start();
            processors.add(p);
        }
        int num = 10000;

        List<DataObject> datas = new ArrayList<>();
        for (int i = 0; i < num; i++) {
            float[] floatArray = new float[1024];
            Arrays.fill(floatArray, 0.124135132531424f);
            ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * Float.BYTES);
            byteBuffer.asFloatBuffer().put(floatArray);
            String s = Base64.getEncoder().encodeToString(byteBuffer.array());
            UUID uuid4 = UUID.randomUUID(); 

            HashMap<String,Object> field = new HashMap<String,Object>();
            field.put("id", uuid4);
            field.put("text_vector", s);
            DataObject dataObject = new DataObject()
               .setFields(field)
               .build();
            datas.add(dataObject);
        }

        ProgressBar pb = new ProgressBar("Processing", num);
        for (DataObject data : datas) {
            queue.put(data);
            pb.step();
        }
        pb.close();

        event.set(true);
        for (Thread p : processors) {
            p.join();
        }
        System.out.println("Main process exiting...");
     }
 }

返回值

Java 调用执行上面的任务,执行成功无返回信息。