按特定条件批量导出Collection中的数据
说明
使用前请先授权 VikingDB 跨服务访问 TOS 去授权
参数名 | 类型 | 必须 | 说明 | 子字段 | 类型 | 必选 | 说明 |
---|---|---|---|---|---|---|---|
task_type | string | 是 | 任务类型,填入data_export | ||||
task_params | map | 是 | 任务参数 | collection_name | string | 是 | Collection 名称,必填 |
filter_conds | map | 是 | 过滤条件,支持 must 、must_nor、range、range_out 算子,使用参考标量过滤
| ||||
export_all | bool | 否 | 导出全部数据,此时filter不生效。默认为false | ||||
tos_path | string | 是 | 将数据文件导入到用户的TOS 路径,格式 :{桶名}/{路径},注意不是域名。必填 | ||||
file_type | string | 否 | 备份文件类型 json 或者 parquet,默认为 parquet |
curl -i -X POST \ -H 'Content-Type: application/json' \ -H 'Authorization: HMAC-SHA256 ***' \ https://api-vikingdb.volces.com/api/task/create \ -d '{ "task_type": "data_export", "tos_path": "bucket_name/dir_name", "task_params": { "collection_name": "example", "filter_conds": [ { "op": "must", "field": "city", "conds": ["beijing"] }, { "op": "range", "field": "user_id", "gt": 0, "lt": 4 }, ] } }'
属性 | 说明 |
---|---|
task_id | uuid |
import tos DOMAIN = "api-vikingdb.volces.com" TOS_ENDPOINT = "tos-cn-beijing.ivolces.com" REGION = "cn-beijing" AK = "****" SK = "****" COLLECTION_NAME = "example" BUCKET_NAME = "bucket_name" TOS_DIR = "tos_dir" def download(client, bucket_name, object_key, local_path): file_path = "{}/{}".format(local_path, object_key) try: client.get_object_to_file(bucket_name, object_key, file_path) except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 return 'fail with client error, message:{}, cause: {}'.format(e.message, e.cause) except tos.exceptions.TosServerError as e: return 'fail with server error : {}'.format(e) except Exception as e: return 'fail with unknown error: {}'.format(e) return '' client = tos.TosClientV2(AK, SK, TOS_ENDPOINT, REGION) download(client, BUCKET_NAME, TOS_DIR, "./")
import pyarrow.parquet as pq def process_parquet(file_path): parquet_file = pq.ParquetFile(file_path) file_data_count = sum(parquet_file.metadata.row_group(i).num_rows for i in range(parquet_file.num_row_groups)) schema = parquet_file.schema.to_arrow_schema() row_iterator = parquet_file.iter_batches(batch_size=100) # 迭代读取数据 for batch in row_iterator: df = batch.to_pandas() for index, row in df.iterrows(): """ do what you want """ print(row) return '' process_parquet("1.parquet")