upsertData 用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。
说明
当前不支持更新部分字段,每次写入数据都要更新所有字段。写入数据时,如果 Collection 中已存在相同主键的数据,则会覆盖源数据;如果 Collection 中没有相同主键的数据,则会写入新数据。
参数名 | 子参数 | 类型 | 是否必选 | 参数说明 |
---|---|---|---|---|
DataObject 说明 DataObject实例或者实例列表。 | fields | array<map> | 是 | 指定写入的数据。
|
TTL | int | 否 | 数据过期时间,单位为秒。
| |
async_upsert | bool | 否 | 是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。
|
//getCollection获取指定数据集,程序初始化时调用即可,无需重复调用 Collection collection = vikingDBService.getCollection("javaSDKTest");
//构建向量 HashMap<String,Object> field1 = new HashMap<String,Object>(); List<String> author1 = new ArrayList<>(); author1.add("name1"); author1.add("name2"); field1.put("doc_id", "11"); field1.put("text_vector", genRandomVector(12)); field1.put("text_sparse_vector", {"hello": 0.34, "world": 0.03, "!": 0.11}); field1.put("like", 1); field1.put("price", 1.11); field1.put("aim", true); field1.put("author", author1); DataObject dataObject1 = new DataObject() .setFields(field1) .setTTL(200000) .build(); HashMap<String,Object> field2 = new HashMap<String,Object>(); List<String> author2 = new ArrayList<>(); author2.add("name3"); author2.add("name4"); field2.put("doc_id", "22"); field2.put("text_vector", genRandomVector(12)); field2.put("text_sparse_vector", {"hi": 0.12, "there": 0.043, "!": 0.5}); field2.put("like", 2); field2.put("price", 1.11); field2.put("aim", false); field2.put("author", author2); DataObject dataObject2 = new DataObject() .setFields(field2) .build(); HashMap<String,Object> field3 = new HashMap<String,Object>(); List<String> author3 = new ArrayList<>(); author3.add("name5"); author3.add("name6"); field3.put("doc_id", "33"); field3.put("text_vector", genRandomVector(12)); field3.put("text_sparse_vector", {"hi": 0.12, "there": 0.043, "!": 0.5}); field3.put("like", 3); field3.put("price", 3.33); field3.put("aim", false); field3.put("author", author3); DataObject dataObject3 = new DataObject() .setFields(field3) .setTTL(6000000) .build(); HashMap<String,Object> field4 = new HashMap<String,Object>(); List<String> author4 = new ArrayList<>(); author4.add("name7"); author4.add("name8"); field4.put("doc_id", "44"); field4.put("text_vector", genRandomVector(12)); field4.put("text_sparse_vector", {"hi": 0.12, "there": 0.043, "!": 0.5}); field4.put("like", 4); field4.put("price", 4.44); field4.put("aim", false); field4.put("author", author4); DataObject dataObject4 = new DataObject() .setFields(field4) .build(); List<DataObject> dataObjects = new ArrayList<>(); dataObjects.add(dataObject1); dataObjects.add(dataObject2); dataObjects.add(dataObject3); dataObjects.add(dataObject4); collection.upsertData(dataObjects);
异步写入数据示例:
class Consumer implements Runnable { private final LinkedBlockingQueue<DataObject> queue; private final AtomicBoolean event; public Consumer(LinkedBlockingQueue<DataObject> queue, AtomicBoolean event) { this.queue = queue; this.event = event; } @Override public void run() { VikingDBService vikingdbService = null; try { vikingdbService = new VikingDBService(); } catch (Exception e) { e.printStackTrace(); } Collection collection = null; try { collection = vikingdbService.getCollection(""); } catch (Exception e) { e.printStackTrace(); } List<DataObject> items = new ArrayList<>(); while (!event.get() || !queue.isEmpty()) { try { DataObject item = queue.poll(1, TimeUnit.SECONDS); if (item != null) { items.add(item); if (items.size() == 50) { try { collection.upsertData(items, true); } catch (Exception e) { e.printStackTrace(); } items.clear(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } System.out.println("Consumer received event. Exiting..."); } } public class test { public static List<Double> genRandomVector(int dim){ List<Double> res = new ArrayList<>(); for(int i=0;i<dim;i++){ res.add(new Random().nextDouble()); } return res; } public static void main(String[] args) throws Exception { VikingDBService vikingDBService = new VikingDBService(); LinkedBlockingQueue<DataObject> queue = new LinkedBlockingQueue<>(10); AtomicBoolean event = new AtomicBoolean(false); List<Thread> processors = new ArrayList<>(); for (int i = 0; i < 10; i++) { Thread p = new Thread(new Consumer(queue, event)); p.start(); processors.add(p); } int num = 10000; List<DataObject> datas = new ArrayList<>(); for (int i = 0; i < num; i++) { float[] floatArray = new float[1024]; Arrays.fill(floatArray, 0.124135132531424f); ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * Float.BYTES); byteBuffer.asFloatBuffer().put(floatArray); String s = Base64.getEncoder().encodeToString(byteBuffer.array()); UUID uuid4 = UUID.randomUUID(); HashMap<String,Object> field = new HashMap<String,Object>(); field.put("id", uuid4); field.put("text_vector", s); DataObject dataObject = new DataObject() .setFields(field) .build(); datas.add(dataObject); } ProgressBar pb = new ProgressBar("Processing", num); for (DataObject data : datas) { queue.put(data); pb.step(); } pb.close(); event.set(true); for (Thread p : processors) { p.join(); } System.out.println("Main process exiting..."); } }
Java 调用执行上面的任务,执行成功无返回信息。