You need to enable JavaScript to run this app.
导航
Python SDK
最近更新时间:2024.06.13 21:31:35首次发布时间:2023.03.06 12:31:23

本文档将介绍如何使用 Python 版 SDK 来进行一个任务投递

使用说明

Python 版本需要不低于python 3,volcengine 安装 1.0.75 及以上的版本。

安装配置

使用 pip 安装 SDK for Python:

pip install volcengine

如果已经安装 volcengine 包,则用下面命令升级即可:

pip install --upgrade volcengine

PythonSDK快速开始

使用python SDK进行投递会有以下几个步骤

  1. 获取当前账号的访问密钥,具体可查看:Access Key(密钥)管理

  2. 创建一个Workspace(或者使用已有Workspace)

  3. 在Workspace中创建一个工作流(或者使用已有工作流)

  4. 为Workspace绑定集群(首次投递任务需要绑定集群,后续无需绑定)

  5. 执行投递

  6. 查看运行状态


  1. 使用SDK创建Workspace(使用已有Workspace中的工作流投递则可以跳过)

首先可以使用SDK创建一个Workspace,输入参数为名称和描述。具体参数可查看:CreateWorkspace--生信操作系统Bio-OS-火山引擎

coding:utf-8
from __future__ import print_function

from volcengine.bioos.BioOsService import BioOsService

if __name__ == '__main__':
   # set endpoint/region here if the default value is unsatisfied
   bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing')

   # call below method if you don't set ak and sk in $HOME/.volc/config
   bioos_service.set_ak('输入AK')
   bioos_service.set_sk('输入SK')

   params = {
       'Name': 'test_workspace',
       'Description': 'this is workspace_description'
   }

   resp = bioos_service.create_workspace(params)
   print(resp)

  1. 导入工作流 (使用已有Workspace中的工作流投递则可以跳过)

在刚才创建的Workspace中导入一个新的工作流,通过以下SDK的示例,您将会在Workspace中导入一个CramtoBam的工作流。具体参数可查看:CreateWorkflow--生信操作系统Bio-OS-火山引擎

# coding:utf-8
from __future__ import print_function

from volcengine.bioos.BioOsService import BioOsService

if __name__ == '__main__':
    # set endpoint/region here if the default value is unsatisfied
    bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing')

    # call below method if you don't set ak and sk in $HOME/.volc/config
    bioos_service.set_ak('输入AK')
    bioos_service.set_sk('输入SK')

    workspace_name = "test_workspace"
    #获取Workspace ID
    params_list_workspace = {
            'Filter':{
                'Keyword':workspace_name
            }
        }
    workspace_id = bioos_service.list_workspaces(params_list_workspace)['Items'][0]["ID"]
    print("workspace_id:",workspace_id)
    
    params = {
        'WorkspaceID': workspace_id,
        'Name': 'test_workflow',
        'Description': 'this is workflow_description',
        'Language': 'WDL',
        'Source': 'https://gitee.com/bio2s/gatk-demo.git',
        'Tag': 'master',
        'MainWorkflowPath': 'CramToBam.wdl',
    }

    resp = bioos_service.create_workflow(params)
    print(resp)

  1. 为Workspace绑定集群(首次投递任务需要绑定集群,后续无需绑定)

首次运行需绑定集群,默认可以考虑直接绑定共享集群,执行如下命令即可

# coding:utf-8
from __future__ import print_function

from volcengine.bioos.BioOsService import BioOsService

if __name__ == '__main__':
    # set endpoint/region here if the default value is unsatisfied
    bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing')

    # call below method if you don't set ak and sk in $HOME/.volc/config
    bioos_service.set_ak('输入AK')
    bioos_service.set_sk('输入SK')

    workspace_name = "test_workspace"
    #获取cluster ID
    params_list_sharedcluster = {
        # 'PageNumber': 1,
        # 'PageSize': 10,
        'Filter': {
            # 'IDs': ['test-workflow'],
            # 'Status': ['Running'],
            'Type': ['shared'],
            # 'Public': True,
        },
    }
    cluster_id = bioos_service.list_clusters(params_list_sharedcluster)['Items'][0]["ID"]
    print("cluster_id:",cluster_id)

    #获取Workspace ID
    params_list_workspace = {
            'Filter':{
                'Keyword':workspace_name
            }
        }
    workspace_id = bioos_service.list_workspaces(params_list_workspace)['Items'][0]["ID"]
    print("workspace_id:",workspace_id)

    #将Workspace绑定集群
    params = {
        'ID': workspace_id,
        'ClusterID': cluster_id,
        'Type': 'workflow',
    }

    resp = bioos_service.bind_cluster_to_workspace(params)
    print(resp)
  1. 使用工作流进行任务投递

在执行这个步骤前,有两个前置步骤

  1. 已经完成集群绑定:具体可查看步骤3

  2. 数据准备:您需要将投递任务中使用到的数据提前上传到tos中,以确保运行过程中输入参数中使用到的数据文件是可以被访问的

# coding:utf-8
from __future__ import print_function

from volcengine.bioos.BioOsService import BioOsService

import json


if __name__ == '__main__':
    # set endpoint/region here if the default value is unsatisfied
    bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing')

    # call below method if you don't set ak and sk in $HOME/.volc/config
    bioos_service.set_ak('输入AK')
    bioos_service.set_sk('输入SK')

    workflow_name= "Cram2Bam"
    workspace_name = "快速开始-工作流-copy"

    #获取cluster ID
    params_list_sharedcluster = {
        # 'PageNumber': 1,
        # 'PageSize': 10,
        'Filter': {
            # 'IDs': ['test-workflow'],
            # 'Status': ['Running'],
            'Type': ['shared'],
            # 'Public': True,
        },
    }
    cluster_id = bioos_service.list_clusters(params_list_sharedcluster)['Items'][0]["ID"]
    print("cluster_id:",cluster_id)

    #获取Workspace ID
    params_list_workspace = {
            'Filter':{
                'Keyword':workspace_name
            }
        }
    workspace_id = bioos_service.list_workspaces(params_list_workspace)['Items'][0]["ID"]
    print("workspace_id:",workspace_id)

    #获取workflow ID
    params_list_workflow = {
        'WorkspaceID': workspace_id,
        'Filter':{
                'Keyword':workflow_name
            }
        }
    workflow_id = bioos_service.list_workflows(params_list_workflow)['Items'][0]["ID"]
    print("workflow_id:",workflow_id)


    params_create_submission = {
        "Name": "Cram2Bam-test",
        'ClusterID': cluster_id,
        'WorkspaceID': workspace_id,
        'WorkflowID': workflow_id,
        'Description': 'this run is submitted using SDK',
        "Inputs": 
            '{"CramToBamFlow.input_cram":"s3://bioos-wcidtv1leig4am39n9lag/NA12878.cram","CramToBamFlow.gotc_docker":"biocontainers/samtools:v1.7.0_cv4", "CramToBamFlow.ref_dict":"workspace.Ref-dict","CramToBamFlow.ref_fasta":"workspace.Ref-fasta","CramToBamFlow.ref_fasta_index":"workspace.Ref-fasta-index","CramToBamFlow.sample_name":"test"}',
        "Outputs": "",
        'ExposedOptions': {'ReadFromCache': False,
                           "ExecutionRootDir": f"s3://bioos-{workspace_id}"
                           },

    }


    resp = bioos_service.create_submission(params_create_submission)
    print(resp)

  1. 查询运行状态

在Bio-OS的投递任务中主要分为三个层级: 投递(submission)-运行(Run)-Task(任务)
一次投递中可以用一个或批量的运行任务,而一个任务则可以由多个task组成。因此需要查询某个运行(run)时,需要传入这次运行所属的SubmissionID,当需要获取具体Task的运行状态时,则需要传入task所属的RunID,示例如下:

# coding:utf-8
from __future__ import print_function

from volcengine.bioos.BioOsService import BioOsService

if __name__ == '__main__':
    # set endpoint/region here if the default value is unsatisfied
    bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing')

    # call below method if you don't set ak and sk in $HOME/.volc/config
    bioos_service.set_ak('输入AK')
    bioos_service.set_sk('输入SK')

    workspace_name = "test_workspace"

    #获取Workspace ID
    params_list_workspace = {
            'Filter':{
                'Keyword':workspace_name
            }
        }
    workspace_id = bioos_service.list_workspaces(params_list_workspace)['Items'][0]["ID"]
    print("workspace_id:",workspace_id)

    #获取submission ID
    params_list_submission = {
        'WorkspaceID': workspace_id,
    }

    submissionID = bioos_service.list_submissions(params_list_submission)['Items'][0]["ID"]

    print("submission_id:",submissionID)

    #获取Run ID
    params_list_run = {
        'WorkspaceID': workspace_id,
        'SubmissionID': submissionID

    }

    RunID = bioos_service.list_runs(params_list_run)

    print("run_id:",RunID)

返回如下所示

run_id: {'Items': [{'ID': 'rcihbi2teig4bed1hheig', 'Status': 'Succeeded', 'StartTime': 1688385804, 'FinishTime': 1688386130, 'Duration': 326, 'SubmissionID': 'scihbi2teig4bed1hhei0', 'EngineRunID': 'ba3c2a91-2022-45af-b50e-feecb491d2f3', 'Inputs': '{"CramToBamFlow.gotc_docker":"biocontainers/samtools:v1.7.0_cv4","CramToBamFlow.input_cram":"s3://bioos-wcidtv1leig4am39n9lag/NA12878.cram","CramToBamFlow.ref_dict":"s3://bioos-wcidtv1leig4am39n9lag/Homo_sapiens_assembly38.dict","CramToBamFlow.ref_fasta":"s3://bioos-wcidtv1leig4am39n9lag/Homo_sapiens_assembly38.fasta","CramToBamFlow.ref_fasta_index":"s3://bioos-wcidtv1leig4am39n9lag/Homo_sapiens_assembly38.fasta.fai","CramToBamFlow.sample_name":"test"}', 'Outputs': '{"CramToBamFlow.outputBai":"s3://bioos-wcih94qdeig4am39nf8ug/analysis/scihbi2teig4bed1hhei0/CramToBamFlow/ba3c2a91-2022-45af-b50e-feecb491d2f3/call-CramToBamTask/execution/test.bai","CramToBamFlow.outputBam":"s3://bioos-wcih94qdeig4am39nf8ug/analysis/scihbi2teig4bed1hhei0/CramToBamFlow/ba3c2a91-2022-45af-b50e-feecb491d2f3/call-CramToBamTask/execution/test.bam"}', 'TaskStatus': {'Count': 1, 'Succeeded': 1, 'Failed': 0, 'Running': 0, 'Queued': 0, 'Initializing': 0, 'Cancelled': 0}, 'Log': 's3://bioos-wcih94qdeig4am39nf8ug/analysis/scihbi2teig4bed1hhei0/workflow.ba3c2a91-2022-45af-b50e-feecb491d2f3.log'}], 'PageNumber': 1, 'PageSize': 10, 'TotalCount': 1}