You need to enable JavaScript to run this app.
导航
Collection 数据过滤导出任务
最近更新时间:2025.04.27 17:35:38首次发布时间:2025.04.27 17:35:38
我的收藏
有用
有用
无用
无用

概述

按特定条件批量导出Collection中的数据

说明

使用前请先授权 VikingDB 跨服务访问 TOS 去授权

请求参数

参数名

类型

必须

说明

子字段

类型

必选

说明

task_type

string

任务类型,填入data_export

task_params

map

任务参数

collection_name

string

Collection 名称,必填

filter_conds

map

过滤条件,支持 must 、must_nor、range、range_out 算子,使用参考标量过滤
过滤更新 city 为 beijing 且 user_id (0, 4) 之间的数据

[
    {
        "op": "must",
        "field": "city",
        "conds": ["beijing"]
    },
    {
        "op": "range",
        "field": "user_id",
        "gt": 0,
        "lt": 4
    },

]

export_all

bool

导出全部数据,此时filter不生效。默认为false

tos_path

string

将数据文件导入到用户的TOS 路径,格式 :{桶名}/{路径},注意不是域名。必填

file_type

string

备份文件类型 json 或者 parquet,默认为 parquet

示例

请求参数

curl -i -X POST \
  -H 'Content-Type: application/json' \
  -H 'Authorization: HMAC-SHA256 ***' \
  https://api-vikingdb.volces.com/api/task/create \
  -d '{
    "task_type": "data_export",
    "tos_path": "bucket_name/dir_name",
    "task_params": {
        "collection_name": "example",
        "filter_conds": [
            {
                "op": "must",
                "field": "city",
                "conds": ["beijing"]
            },
            {
                "op": "range",
                "field": "user_id",
                "gt": 0,
                "lt": 4
            },
        ]
    }
}'

返回值

属性

说明

task_id

uuid

后续处理

1、从 TOS 下载文件

import tos
DOMAIN = "api-vikingdb.volces.com"
TOS_ENDPOINT = "tos-cn-beijing.ivolces.com"
REGION = "cn-beijing"
AK = "****"
SK = "****"
COLLECTION_NAME = "example"
BUCKET_NAME = "bucket_name"
TOS_DIR = "tos_dir"

def download(client, bucket_name, object_key, local_path):
    file_path = "{}/{}".format(local_path, object_key)
    try:
        client.get_object_to_file(bucket_name, object_key, file_path)
    except tos.exceptions.TosClientError as e:
        # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
        return 'fail with client error, message:{}, cause: {}'.format(e.message, e.cause)
    except tos.exceptions.TosServerError as e:
        return 'fail with server error : {}'.format(e)
    except Exception as e:
        return 'fail with unknown error: {}'.format(e)
    return ''

client = tos.TosClientV2(AK, SK, TOS_ENDPOINT, REGION)
download(client, BUCKET_NAME, TOS_DIR, "./")

2、解析 parquet 类型数据

import pyarrow.parquet as pq
def process_parquet(file_path):
    parquet_file = pq.ParquetFile(file_path)

    file_data_count = sum(parquet_file.metadata.row_group(i).num_rows for i in range(parquet_file.num_row_groups))
    schema = parquet_file.schema.to_arrow_schema()
    row_iterator = parquet_file.iter_batches(batch_size=100)
    # 迭代读取数据
    for batch in row_iterator:
        df = batch.to_pandas()
        for index, row in df.iterrows():
            """
            do what you want
            """
            print(row)
    return ''

process_parquet("1.parquet")