用于批量导出指定数据集中的数据。
本功能包括两个接口:
/task/create 用于在指定的数据集 Collection 按条件批量导出任务。
说明
请求向量数据库 VikingDB 的 OpenAPI 接口时,需要构造签名进行鉴权,详细的 OpenAPI 签名调用方法请参见 API签名调用指南。
URI | /api/task/create | 统一资源标识符 |
---|---|---|
请求方法 | POST | 客户端对向量数据库服务器请求的操作类型 |
请求头 | Content-Type: application/json | 请求消息类型 |
Authorization: HMAC-SHA256 *** | 鉴权 |
字段名 | 类型 | 说明 | 子字段 | 类型 | 说明 |
---|---|---|---|---|---|
task_type | string | 任务类型,必填 |
| ||
task_params | json | 任务参数 | collection_name | string | |
filter | json | 过滤语句,现在想导出 user_id 值在 [1,2,3] 里 并且 city 为 beijing 的数据
| |||
tos_path | string | 将数据文件导入到用户的tos目录 | |||
file_type | string | 导出的文件格式,可选 parquet、json | |||
export_all | bool | 导出全部数据,此时filter不生效 |
字段 | 备注 | 子字段 | 子字段说明 |
---|---|---|---|
code | 状态码 | ||
message | 返回信息 | ||
data | task_id | uuid |
/task/info用于查询任务状态、任务进度等离线任务的详细信息。
URI | /api/task/info | 统一资源标识符 |
---|---|---|
请求方法 | POST | 客户端对向量数据库服务器请求的操作类型 |
请求头 | Content-Type: application/json | 请求消息类型 |
Authorization: HMAC-SHA256 *** | 鉴权 |
字段名 | 类型 | 说明 |
---|---|---|
task_id | string | 任务ID,必填 |
字段 | 备注 | 子字段 | 类型 | 子字段说明 | |||
---|---|---|---|---|---|---|---|
code | 状态码 | ||||||
message | 返回信息 | ||||||
data | task_id | string | 任务ID | ||||
task_type | string | 任务类型 | |||||
task_status | string | 任务状态 | |||||
update_person | string | 任务更新人 | |||||
update_time | string | 任务信息更新时间 | |||||
create_time | string | 任务信息创建时间 | |||||
process_info | map | 任务处理信息,例如进度等。 | sampled_data | list | 采样5条数据用于展示 | ||
sampled_timestamp | int64 | 采样的时间戳,后写入的数据不会被导出 | |||||
err_msg | string | 任务错误信息 | |||||
task_progress | string | 任务进度 例如50% | |||||
total_data_count | int64 | collection 数据总条数(预估) | |||||
total_filter_count | int64 | 数据当前导出量 | |||||
scan_data_count | int64 | 当前扫描数据量 | |||||
task_params | map | 任务配置信息 |
任务状态 | 说明 |
---|---|
init | 任务初始化中 |
queued | 任务正在调度,排队中 |
running | 任务运行中 |
done | 任务已完成 |
fail | 任务失败 |
confirm | 任务需要人工确认 |
confirmed | 任务已经人工确认 |
code | message | 备注 | http status_code |
---|---|---|---|
0 | success | 成功 | 200 |
1000005 | collection not exist | Collection不存在 | 400 |
1000003 | invalid request:%s | 非法参数
| 400 |
1000001 | unauthorized | 缺乏鉴权信息 | 401 |
1000002 | no permission | 权限不足 | 403 |
1000028 | Internal error | 内部错误 | 500 |
1000035 | task not exist | task 不存在 | 404 |
申请开通 tos 服务 https://console.volcengine.com/tos/bucket/create
对向量库进行授权 https://console.volcengine.com/iam/service/attach_role/?ServiceName=ml_platform
import json, ijson import logging import os import pandas as pd import requests from volcengine.auth.SignerV4 import SignerV4 from volcengine.Credentials import Credentials from volcengine.base.Request import Request from volcengine.viking_db import VikingDBService, Collection, common import tos import pyarrow.parquet as pq import numpy as np from pre import * def prepare_request(method, path, ak, sk, params=None, data=None, doseq=0): request = Request() request.set_method(method) request.set_host("api-vikingdb.volces.com") request.set_path(path) mheaders = { 'Accept': 'application/json', 'Content-Type': 'application/json', } request.set_headers(mheaders) if params: request.set_query(params) if data: request.set_body(json.dumps(data)) credentials = Credentials(ak, sk, 'air', 'cn-north-1') SignerV4.sign(request, credentials) return request DOMAIN = "api-vikingdb.volces.com" AK = "****" SK = "****" COLLECTION_NAME = "example" BUCKET_NAME = "data-export" TOS_DIR = "task_example" task_type = "data_export" file_type = "parquet" task_param = { "collection_name": COLLECTION_NAME, "tos_path": "{}/{}".format(BUCKET_NAME, TOS_DIR), "filter" : { "user_id": [5,9], "city": ["beijing"] } } req_info = prepare_request(method = "POST", path = "/api/task/create", ak = AK, sk = SK, data ={"task_type":task_type, "task_params":task_param}) r = requests.request(method=req_info.method, url="http://{}{}".format(DOMAIN, req_info.path), headers=req_info.headers, data=req_info.body ) task_id = json.loads(r.text)['data']['task_id']
def download(client, bucket_name, object_key, local_path): file_path = "{}/{}".format(local_path, object_key) try: client.get_object_to_file(bucket_name, object_key, file_path) except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 return 'fail with client error, message:{}, cause: {}'.format(e.message, e.cause) except tos.exceptions.TosServerError as e: return 'fail with server error : {}'.format(e) except Exception as e: return 'fail with unknown error: {}'.format(e) return '' client = tos.TosClientV2(AK, SK, TOS_ENDPOINT, REGION) download(client, BUCKET_NAME, "tos-file-path", LOCAL_FILE_PATH)
def process_parquet(file_path): parquet_file = pq.ParquetFile(file) file_data_count = sum(parquet_file.metadata.row_group(i).num_rows for i in range(parquet_file.num_row_groups)) schema = parquet_file.schema.to_arrow_schema() row_iterator = parquet_file.iter_batches(batch_size=100) # 迭代读取数据 for batch in row_iterator: df = batch.to_pandas() for index, row in df.iterrows(): """ do what you want """ print(row) return '' process_parquet("1.parquet")