需要提前安装好 python3.6 及以上版本
wget https://ml-platform-public-examples-cn-beijing.tos-cn-beijing.volces.com/python_sdk_installer/volcengine_ml_platform-1.1.0b2-py3-none-any.whl pip3 install volcengine_ml_platform-1.1.0b2-py3-none-any.whl
在正式使用 SDK 之前需要先完成火山引擎账号的 AK / SK 的本地配置,用以在使用 SDK 访问机器学习平台时的身份校验。
AccessKeyFullAccess
的 IAM 策略。请使用真实的 AK/SK 替换下列方法中的 <your access key> 和 <your secret access key>
mkdir -p $HOME/.volc cat <<EOF > $HOME/.volc/credentials [default] access_key_id = <your access key> secret_access_key = <your secret access key> EOF cat <<EOF > $HOME/.volc/config [default] region = cn-beijing # 填写所在地域,目前仅支持 cn-beijing EOF
import volcengine_ml_platform as vemlp vemlp.init( ak='<your access key>', sk='<your secret access key>', region='cn-beijing', )
export VOLC_ACCESSKEY='<your access key>' export VOLC_SECRETKEY='<your secret access key>' export VOLC_REGION=cn-beijing
class volcengine_ml_platform.pipeline.Pipeline( *, id: Optional[str] = None, name: Optional[str] = None, version_id: Optional[str] = None, version_number: Optional[int] = None, description: Optional[str] = None, pipeline_template: Optional[PipelineTemplate] = None, user_id: Optional[str] = None, account_id: Optional[str] = None, create_time: Optional[str] = None, update_time: Optional[str] = None, extra: Optional[Dict[str, str]] = None, )
通过 id 或 name 获取 pipeline 的最新配置
Parameters
id - Pipeline id
name - Pipeline name
Example
>>> from volcengine_ml_platform.pipeline import Pipeline >>> p = Pipeline.get(id='pipeline_id') >>> print(p) >>> p = Pipeline.get(name='pipeline_name') >>> print(p)
通过配置文件加载 Pipeline
Parameters
filename - 要加载的文件路径
type - 要加载的文件类型,目前只支持 yaml
name - Pipeline 名称
description - Pipeline 描述
Example
>>> from volcengine_ml_platform.pipeline import Pipeline >>> Pipeline.load(...) >>> Pipeline.load(..., name='pipeline_name', description='pipeline_description')
创建 Pipeline
Example
>>> from volcengine_ml_platform.pipeline import Pipeline >>> Pipeline(name='pipeline_name', description='pipeline_description').create() >>> Pipeline.load(...).create()
判断 Pipeline 是否已存在
Example
>>> from volcengine_ml_platform.pipeline import Pipeline >>> Pipeline(id='pipeline_id').exists() >>> Pipeline(name='pipeline_name').exists() >>> Pipeline(id='pipeline_id', name='pipeline_name').exists() >>> Pipeline.load(...).exists()
更新 Pipeline 配置
Parameters
upload_code_progress_bar - 上传代码是否显示进度条,默认不显示
upload_code_timeout - 上传代码超时时间,默认 1h(3600s)
upload_code_copy_links - 当遇到链接文件时,将链接执行的文件副本上传(如链接失效则会上传失败)
upload_code_retain_links - 当遇到链接文件时,将链接文件原封不动上传(如链接指向文件在镜像环境不存在,则可能在运行时报错)
Example
>>> from volcengine_ml_platform.pipeline import Pipeline >>> Pipeline(id='pipeline_id').update() >>> Pipeline(name='pipeline_name').update() >>> Pipeline.load(...).update()
运行 Pipeline,如果 Pipeline.pipeline_template is None
运行云端最新配置,反之运行本地配置
Parameters
launch_name - 运行实例名称,如不传,则自动按照 {Pipeline.name}_{random 5 chars}
格式自动生成
inputs - Pipeline 定义所需要的输入参数
Example
>>> from volcengine_ml_platform.pipeline import Pipeline >>> Pipeline.get(id='pipeline_id').launch() >>> Pipeline.get(name='pipeline_name').launch() >>> Pipeline.load(...).launch()
更新并运行 Pipeline
Parameters
launch_name - 运行实例名称,如不传,则自动按照 {Pipeline.name}_{random 5 chars}
格式自动生成
inputs - Pipeline 定义所需要的输入参数
upload_code_progress_bar - 上传代码是否显示进度条,默认不显示
upload_code_timeout - 上传代码超时时间,默认 1h(3600s)
upload_code_copy_links - 当遇到链接文件时,将链接执行的文件副本上传(如链接失效则会上传失败)
upload_code_retain_links - 当遇到链接文件时,将链接文件原封不动上传(如链接指向文件在镜像环境不存在,则可能在运行时报错)
Example
>>> from volcengine_ml_platform.pipeline import Pipeline >>> Pipeline.get(id='pipeline_id').update_and_launch() >>> Pipeline.get(name='pipeline_name').update_and_launch() >>> Pipeline.load(...).update_and_launch()
打印 Pipeline DAG 图
Parameters
x
横向打印,y
纵向打印Example
>>> Pipeline.get(id='pipeline_id').show() """ + - - - - - - - - - - - - - - - - -+ ' linear_pipeline_with_two_tasks ' ' ' ' +------------------------------+ ' ' | task_a | ' ' +------------------------------+ ' ' | ' ' | ' ' v ' ' +------------------------------+ ' ' | task_b | ' ' +------------------------------+ ' ' ' + - - - - - - - - - - - - - - - - -+ """ >>> Pipeline.get(id='pipeline_id').show(axis='x') """ + - - - - - - - - - - - - - - - - - - - - - - - - - - -+ ' linear_pipeline_with_two_tasks ' ' ' ' +--------+ +--------+ ' ' | task_a | -----------------------------> | task_b | ' ' +--------+ +--------+ ' ' ' + - - - - - - - - - - - - - - - - - - - - - - - - - - -+ """
将 Pipeline 导出到文件
Parameters
output_path - 导出文件路径,如果路径为目录,会自动生成 {pipeline.name}_{yyyyMMdd_hhmmss}.{type}
的文件
type - 导出文件格式,目前仅支持 yaml
Example
>>> from volcengine_ml_platform.pipeline import Pipeline >>> Pipeline.get(id='pipeline_id').dump() >>> Pipeline.get(name='pipeline_name').dump() >>> Pipeline.load(...).dump()
class volcengine_ml_platform.pipeline.PipelineInstance( *, id: Optional[str], name: Optional[str], pipeline_id: Optional[str], pipeline_version_id: Optional[str], pipeline_template: Optional[PipelineTemplate] = None, global_inputs: Optional[List[InputInstance]] = None, global_outputs: Optional[List[OutputInstance]] = None, state: Optional[str] = None, launch_time: Optional[str] = None, finish_time: Optional[str] = None, diag_info: Optional[str] = None, trigger: Optional[str] = None, user_id: Optional[str] = None, account_id: Optional[str] = None, create_time: Optional[str] = None, update_time: Optional[str] = None, extra: Optional[Dict[str, str]] = None, )
通过 id 获取 pipeline 实例的信息
Parameters
Example
>>> from volcengine_ml_platform.pipeline import PipelineInstance >>> PipelineInstance.get('pipeline_instance_id')
通过 id 获取 pipeline 实例的信息
Parameters
output_path - 导出文件路径,如果路径为目录,会自动生成 {pipeline.name}_{yyyyMMdd_hhmmss}.{type}
的文件
type - 导出文件格式,目前仅支持 yaml
Example
>>> from volcengine_ml_platform.pipeline import PipelineInstance >>> PipelineInstance.get('pipeline_instance_id').dump() >>> PipelineInstance.get('pipeline_instance_id').dump('pipeline.yaml')
通过 id 获取 pipeline 实例的信息
Parameters
x
横向打印,y
纵向打印Example
>>> from volcengine_ml_platform.pipeline import PipelineInstance >>> PipelineInstance.get('pipeline_instance_id').show() + - - - - - - - - - - - - - - - - - - - -+ ' linear_pipeline_with_two_tasks_sasnf ' ' ' ' +------------------------------------+ ' ' | task_a(Finished) | ' ' +------------------------------------+ ' ' | ' ' | ' ' v ' ' +------------------------------------+ ' ' | task_b(Running) | ' ' +------------------------------------+ ' ' ' + - - - - - - - - - - - - - - - - - - - -+
请使用真实的队列名称替换下文中的 <Your Resource Queue Name>
import os import sys from volcengine_ml_platform.pipeline import Pipeline yaml_file_path = "<your yaml file path>" pipeline_name = "<your pipeline name>" def main(): pipeline = Pipeline.load(yaml_file_path, name=pipeline_name) # Create a pipeline if not exists # If load after create, the pipeline is empty if not pipeline.exists(): pipeline.create() pipeline_instance_id = pipeline.launch() print(f"Pipeline Id: {pipeline.id}") print(f"Pipeline Instance Id: {pipeline_instance_id}") if __name__ == "__main__": main()
run.sh
文件#!/usr/bin/env bash set -x pwd ls env
import os import sys from volcengine_ml_platform.pipeline import Pipeline yaml_file_path = "<your yaml file path>" pipeline_name = "<your pipeline name>" def main(): pipeline = Pipeline.load(yaml_file_path, name=pipeline_name) # Create a pipeline if not exists # If load after create, the pipeline is empty if not pipeline.exists(): pipeline.create() inputs = {"command": "echo 'hello pipeline'"} pipeline_instance_id = pipeline.launch(inputs=inputs) print(f"Pipeline Id: {pipeline.id}") print(f"Pipeline Instance Id: {pipeline_instance_id}") if __name__ == "__main__": main()
import os import sys from volcengine_ml_platform.pipeline import Pipeline pipeline_name = "<your pipeline name>" export_file_name = "<your export file name>" def main(): pipeline = Pipeline.get(name=pipeline_name) if export_file_name: os.makedirs(os.path.dirname(export_file_name), exist_ok=True) pipeline.dump(export_file_name, type='yaml') else: pipeline.dump(type='yaml') if __name__ == "__main__": main()
import os import sys from volcengine_ml_platform.pipeline import PipelineInstance pipeline_instance_id = "<your pipeline instance id>" export_file_name = "<your export file name>" def main(): pipeline_instance = PipelineInstance.get(pipeline_instance_id) if export_file_name: os.makedirs(os.path.dirname(export_file_name), exist_ok=True) pipeline_instance.dump(export_file_name, type='yaml') else: pipeline_instance.dump(type='yaml') if __name__ == "__main__": main()