import multiprocessing
import struct, base64, uuid, tqdm, time
from volcengine.viking_db import *
queue = multiprocessing.Queue(maxsize=10)
event = multiprocessing.Event()
def consumer():
"""消费者函数:从队列中取出数据并处理"""
vikingdb_service = VikingDBService()
vikingdb_service.set_ak("ak")
vikingdb_service.set_sk("sk")
collection = vikingdb_service.get_collection("")
items = []
while not event.is_set() or not queue.empty():
item = queue.get()
items.append(item)
if len(items) == 50:
collection.upsert_data(items, async_upsert=True)
items = []
print("Consumer received event. Exiting...")
if __name__ == "__main__": # 创建消费者进程
processors = []
for i in range(10):
p = multiprocessing.Process(target=consumer)
p.start()
processors.append(p)
datas = [] # 准备数据
for i in range(100000): # 压缩向量
float_array = [0.124135132531424]*1024
packed_data = struct.pack('f'*len(float_array), *float_array)
s = base64.b64encode(packed_data).decode()
uuid4 = uuid.uuid4() # 此处用户可修改为自己希望的id
datas.append(Data({"id": str(uuid4), "text_vertor": s}))
for data in tqdm.tqdm(datas):
queue.put(data)
event.set() # 通知消费者停止工作
for p in processors:
p.join()
print("Main process exiting...")
class Consumer implements Runnable {
private final LinkedBlockingQueue<DataObject> queue;
private final AtomicBoolean event;
public Consumer(LinkedBlockingQueue<DataObject> queue, AtomicBoolean event) {
this.queue = queue;
this.event = event;
}
@Override
public void run() {
VikingDBService vikingdbService = null;
try {
vikingdbService = new VikingDBService();
} catch (Exception e) {
e.printStackTrace();
}
Collection collection = null;
try {
collection = vikingdbService.getCollection("");
} catch (Exception e) {
e.printStackTrace();
}
List<DataObject> items = new ArrayList<>();
while (!event.get() || !queue.isEmpty()) {
try {
DataObject item = queue.poll(1, TimeUnit.SECONDS);
if (item != null) {
items.add(item);
if (items.size() == 50) {
try {
collection.upsertData(items, true);
} catch (Exception e) {
e.printStackTrace();
}
items.clear();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("Consumer received event. Exiting...");
}
}
public class test {
public static List<Double> genRandomVector(int dim){
List<Double> res = new ArrayList<>();
for(int i=0;i<dim;i++){
res.add(new Random().nextDouble());
}
return res;
}
public static void main(String[] args) throws Exception {
VikingDBService vikingDBService = new VikingDBService();
LinkedBlockingQueue<DataObject> queue = new LinkedBlockingQueue<>(10);
AtomicBoolean event = new AtomicBoolean(false);
List<Thread> processors = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread p = new Thread(new Consumer(queue, event));
p.start();
processors.add(p);
}
int num = 10000;
List<DataObject> datas = new ArrayList<>();
for (int i = 0; i < num; i++) {
float[] floatArray = new float[1024];
Arrays.fill(floatArray, 0.124135132531424f);
ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * Float.BYTES);
byteBuffer.asFloatBuffer().put(floatArray);
String s = Base64.getEncoder().encodeToString(byteBuffer.array());
UUID uuid4 = UUID.randomUUID();
HashMap<String,Object> field = new HashMap<String,Object>();
field.put("id", uuid4);
field.put("text_vector", s);
DataObject dataObject = new DataObject()
.setFields(field)
.build();
datas.add(dataObject);
}
ProgressBar pb = new ProgressBar("Processing", num);
for (DataObject data : datas) {
queue.put(data);
pb.step();
}
pb.close();
event.set(true);
for (Thread p : processors) {
p.join();
}
System.out.println("Main process exiting...");
}
}
var wg sync.WaitGroup
func consumer(queue <-chan vikingdb.Data, vikingDBService *vikingdb.VikingDBService, stopChan <-chan struct{}) {
defer wg.Done()
collection, err := vikingDBService.GetCollection("")
if err != nil {
fmt.Println(err)
}
items := make([]vikingdb.Data, 0, 50)
for {
select {
case item := <-queue:
items = append(items, item)
if len(items) == 50 {
collection.UpsertData(items, vikingdb.WithAsyncUpsert(true))
items = items[:0]
}
case <-stopChan:
if len(items) > 0 {
collection.UpsertData(items, vikingdb.WithAsyncUpsert(true))
}
return
}
}
}
func main() {
queue := make(chan vikingdb.Data, 10)
stopChan := make(chan struct{})
vikingDBService := vikingdb.NewVikingDBService()
// 创建消费者
for i := 0; i < 10; i++ {
wg.Add(1)
go consumer(queue, vikingDBService, stopChan)
}
// 准备数据
num := 10000
floatArray := make([]float32, 1024)
for j := range floatArray {
floatArray[j] = 0.124135132531424
}
var datas []vikingdb.Data
for i := 0; i < num; i++ {
packedData := make([]byte, 1024*4)
for j, v := range floatArray {
binary.LittleEndian.PutUint32(packedData[j*4:], math.Float32bits(v))
}
s := base64.StdEncoding.EncodeToString(packedData)
id := uuid.New().String()
field := map[string]interface{}{
"id": id,
"text_vector": s,
}
datas = append(datas, vikingdb.Data{Fields: field})
}
progressBar := pb.StartNew(num)
for _, data := range datas {
queue <- data
progressBar.Increment()
}
close(stopChan)
progressBar.Finish()
// 等待所有消费者完成工作
wg.Wait()
fmt.Println("Main process exiting...")
}