You need to enable JavaScript to run this app.
导航
批量导出数据
最近更新时间:2024.12.13 20:45:11首次发布时间:2024.12.13 20:45:11

概述

用于批量导出指定数据集中的数据。
本功能包括两个接口:

  • /task/create 创建导出任务
  • /task/info 查询任务信息

接口列表

创建导出任务(create)

/task/create 用于在指定的数据集 Collection 按条件批量导出任务。

请求接口

说明

请求向量数据库 VikingDB 的 OpenAPI 接口时,需要构造签名进行鉴权,详细的 OpenAPI 签名调用方法请参见 API签名调用指南

URI

/api/task/create

统一资源标识符

请求方法

POST

客户端对向量数据库服务器请求的操作类型

请求头

Content-Type: application/json

请求消息类型

Authorization: HMAC-SHA256 ***

鉴权

请求参数

字段名

类型

说明

子字段

类型

说明

task_type

string

任务类型,必填

  • data_export

task_params

json

任务参数

collection_name

string

filter

json

过滤语句,现在想导出 user_id 值在 [1,2,3] 里 并且 city 为 beijing 的数据
字段列表为且关系,字段的过滤值列表为或关系。
暂时只支持 int64, list, string, list 的导出

{
    "user_id":[1,2,3],
    "city":["beijing"]
}

tos_path

string

将数据文件导入到用户的tos目录

file_type

string

导出的文件格式,可选 parquet、json

export_all

bool

导出全部数据,此时filter不生效

响应消息

字段

备注

子字段

子字段说明

code

状态码

message

返回信息

data

task_id

uuid

任务运行信息(info)

/task/info用于查询任务状态任务进度等离线任务的详细信息。

请求接口

URI

/api/task/info

统一资源标识符

请求方法

POST

客户端对向量数据库服务器请求的操作类型

请求头

Content-Type: application/json

请求消息类型

Authorization: HMAC-SHA256 ***

鉴权

请求参数

字段名

类型

说明

task_id

string

任务ID,必填

响应消息

字段

备注

子字段

类型

子字段说明

code

状态码

message

返回信息

data

task_id

string

任务ID

task_type

string

任务类型

task_status

string

任务状态

update_person

string

任务更新人

update_time

string

任务信息更新时间

create_time

string

任务信息创建时间

process_info

map

任务处理信息,例如进度等。

sampled_data

list

采样5条数据用于展示

sampled_timestamp

int64

采样的时间戳,后写入的数据不会被导出

err_msg

string

任务错误信息

task_progress

string

任务进度 例如50%

total_data_count

int64

collection 数据总条数(预估)

total_filter_count

int64

数据当前导出量

scan_data_count

int64

当前扫描数据量

task_params

map

任务配置信息

任务状态说明

任务状态

说明

init

任务初始化中

queued

任务正在调度,排队中

running

任务运行中

done

任务已完成

fail

任务失败

confirm

任务需要人工确认

confirmed

任务已经人工确认

错误码说明

code

message

备注

http status_code

0

success

成功

200

1000005

collection not exist

Collection不存在

400

1000003

invalid request:%s

非法参数

  • 缺失必选参数
  • 字段值与字段类型不匹配

400

1000001

unauthorized

缺乏鉴权信息

401

1000002

no permission

权限不足

403

1000028

Internal error

内部错误

500

1000035

task not exist

task 不存在

404

示例

1. 申请 tos 服务,并对向量库进行授权

申请开通 tos 服务 https://console.volcengine.com/tos/bucket/create
对向量库进行授权 https://console.volcengine.com/iam/service/attach_role/?ServiceName=ml_platform

2. 创建导出任务

import json, ijson
import logging
import os
import pandas as pd
import requests
from volcengine.auth.SignerV4 import SignerV4
from volcengine.Credentials import Credentials
from volcengine.base.Request import Request
from volcengine.viking_db import VikingDBService, Collection, common
import tos
import pyarrow.parquet as pq
import numpy as np
from pre import *

def prepare_request(method, path, ak, sk, params=None, data=None, doseq=0):
    request = Request()
    request.set_method(method)
    request.set_host("api-vikingdb.volces.com")
    request.set_path(path)
    mheaders = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
    }
    request.set_headers(mheaders)
    if params:
        request.set_query(params)
    if data:
        request.set_body(json.dumps(data))
    credentials = Credentials(ak, sk, 'air', 'cn-north-1')
    SignerV4.sign(request, credentials)
    return request

DOMAIN = "api-vikingdb.volces.com"
AK = "****"
SK = "****"
COLLECTION_NAME = "example"
BUCKET_NAME = "data-export"
TOS_DIR = "task_example"
task_type = "data_export"
file_type = "parquet"

task_param = {
    "collection_name": COLLECTION_NAME,
    "tos_path": "{}/{}".format(BUCKET_NAME, TOS_DIR),
    "filter" : {
        "user_id": [5,9],
        "city": ["beijing"]
    }
}

req_info = prepare_request(method = "POST", 
                    path = "/api/task/create",
                    ak = AK, 
                    sk = SK,
                    data ={"task_type":task_type, "task_params":task_param})
r = requests.request(method=req_info.method,
        url="http://{}{}".format(DOMAIN, req_info.path),
        headers=req_info.headers,
        data=req_info.body
    )
task_id = json.loads(r.text)['data']['task_id']

3. 扫描从 tos 中下载文件

def download(client, bucket_name, object_key, local_path):
    file_path = "{}/{}".format(local_path, object_key)
    try:
        client.get_object_to_file(bucket_name, object_key, file_path)
    except tos.exceptions.TosClientError as e:
        # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
        return 'fail with client error, message:{}, cause: {}'.format(e.message, e.cause)
    except tos.exceptions.TosServerError as e:
        return 'fail with server error : {}'.format(e)
    except Exception as e:
        return 'fail with unknown error: {}'.format(e)
    return ''


client = tos.TosClientV2(AK, SK, TOS_ENDPOINT, REGION)
download(client, BUCKET_NAME, "tos-file-path", LOCAL_FILE_PATH)

4. 解析parquet类型数据

def process_parquet(file_path):
    parquet_file = pq.ParquetFile(file)

    file_data_count = sum(parquet_file.metadata.row_group(i).num_rows for i in range(parquet_file.num_row_groups))
    schema = parquet_file.schema.to_arrow_schema()
    row_iterator = parquet_file.iter_batches(batch_size=100)
    # 迭代读取数据
    for batch in row_iterator:
        df = batch.to_pandas()
        for index, row in df.iterrows():
            """
            do what you want
            """
            print(row)
    return ''

process_parquet("1.parquet")