本文档将介绍如何使用 Python 版 SDK 来进行一个任务投递
Python 版本需要不低于python 3,volcengine 安装 1.0.75 及以上的版本。
使用 pip 安装 SDK for Python:
pip install volcengine
如果已经安装 volcengine 包,则用下面命令升级即可:
pip install --upgrade volcengine
使用python SDK进行投递会有以下几个步骤
获取当前账号的访问密钥,具体可查看:Access Key(密钥)管理
创建一个Workspace(或者使用已有Workspace)
在Workspace中创建一个工作流(或者使用已有工作流)
为Workspace绑定集群(首次投递任务需要绑定集群,后续无需绑定)
执行投递
查看运行状态
首先可以使用SDK创建一个Workspace,输入参数为名称和描述。具体参数可查看:CreateWorkspace--生信操作系统Bio-OS-火山引擎
coding:utf-8 from __future__ import print_function from volcengine.bioos.BioOsService import BioOsService if __name__ == '__main__': # set endpoint/region here if the default value is unsatisfied bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing') # call below method if you don't set ak and sk in $HOME/.volc/config bioos_service.set_ak('输入AK') bioos_service.set_sk('输入SK') params = { 'Name': 'test_workspace', 'Description': 'this is workspace_description' } resp = bioos_service.create_workspace(params) print(resp)
在刚才创建的Workspace中导入一个新的工作流,通过以下SDK的示例,您将会在Workspace中导入一个CramtoBam的工作流。具体参数可查看:CreateWorkflow--生信操作系统Bio-OS-火山引擎
# coding:utf-8 from __future__ import print_function from volcengine.bioos.BioOsService import BioOsService if __name__ == '__main__': # set endpoint/region here if the default value is unsatisfied bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing') # call below method if you don't set ak and sk in $HOME/.volc/config bioos_service.set_ak('输入AK') bioos_service.set_sk('输入SK') workspace_name = "test_workspace" #获取Workspace ID params_list_workspace = { 'Filter':{ 'Keyword':workspace_name } } workspace_id = bioos_service.list_workspaces(params_list_workspace)['Items'][0]["ID"] print("workspace_id:",workspace_id) params = { 'WorkspaceID': workspace_id, 'Name': 'test_workflow', 'Description': 'this is workflow_description', 'Language': 'WDL', 'Source': 'https://gitee.com/bio2s/gatk-demo.git', 'Tag': 'master', 'MainWorkflowPath': 'CramToBam.wdl', } resp = bioos_service.create_workflow(params) print(resp)
首次运行需绑定集群,默认可以考虑直接绑定共享集群,执行如下命令即可
# coding:utf-8 from __future__ import print_function from volcengine.bioos.BioOsService import BioOsService if __name__ == '__main__': # set endpoint/region here if the default value is unsatisfied bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing') # call below method if you don't set ak and sk in $HOME/.volc/config bioos_service.set_ak('输入AK') bioos_service.set_sk('输入SK') workspace_name = "test_workspace" #获取cluster ID params_list_sharedcluster = { # 'PageNumber': 1, # 'PageSize': 10, 'Filter': { # 'IDs': ['test-workflow'], # 'Status': ['Running'], 'Type': ['shared'], # 'Public': True, }, } cluster_id = bioos_service.list_clusters(params_list_sharedcluster)['Items'][0]["ID"] print("cluster_id:",cluster_id) #获取Workspace ID params_list_workspace = { 'Filter':{ 'Keyword':workspace_name } } workspace_id = bioos_service.list_workspaces(params_list_workspace)['Items'][0]["ID"] print("workspace_id:",workspace_id) #将Workspace绑定集群 params = { 'ID': workspace_id, 'ClusterID': cluster_id, 'Type': 'workflow', } resp = bioos_service.bind_cluster_to_workspace(params) print(resp)
在执行这个步骤前,有两个前置步骤
已经完成集群绑定:具体可查看步骤3
数据准备:您需要将投递任务中使用到的数据提前上传到tos中,以确保运行过程中输入参数中使用到的数据文件是可以被访问的
# coding:utf-8 from __future__ import print_function from volcengine.bioos.BioOsService import BioOsService import json if __name__ == '__main__': # set endpoint/region here if the default value is unsatisfied bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing') # call below method if you don't set ak and sk in $HOME/.volc/config bioos_service.set_ak('输入AK') bioos_service.set_sk('输入SK') workflow_name= "Cram2Bam" workspace_name = "快速开始-工作流-copy" #获取cluster ID params_list_sharedcluster = { # 'PageNumber': 1, # 'PageSize': 10, 'Filter': { # 'IDs': ['test-workflow'], # 'Status': ['Running'], 'Type': ['shared'], # 'Public': True, }, } cluster_id = bioos_service.list_clusters(params_list_sharedcluster)['Items'][0]["ID"] print("cluster_id:",cluster_id) #获取Workspace ID params_list_workspace = { 'Filter':{ 'Keyword':workspace_name } } workspace_id = bioos_service.list_workspaces(params_list_workspace)['Items'][0]["ID"] print("workspace_id:",workspace_id) #获取workflow ID params_list_workflow = { 'WorkspaceID': workspace_id, 'Filter':{ 'Keyword':workflow_name } } workflow_id = bioos_service.list_workflows(params_list_workflow)['Items'][0]["ID"] print("workflow_id:",workflow_id) params_create_submission = { "Name": "Cram2Bam-test", 'ClusterID': cluster_id, 'WorkspaceID': workspace_id, 'WorkflowID': workflow_id, 'Description': 'this run is submitted using SDK', "Inputs": '{"CramToBamFlow.input_cram":"s3://bioos-wcidtv1leig4am39n9lag/NA12878.cram","CramToBamFlow.gotc_docker":"biocontainers/samtools:v1.7.0_cv4", "CramToBamFlow.ref_dict":"workspace.Ref-dict","CramToBamFlow.ref_fasta":"workspace.Ref-fasta","CramToBamFlow.ref_fasta_index":"workspace.Ref-fasta-index","CramToBamFlow.sample_name":"test"}', "Outputs": "", 'ExposedOptions': {'ReadFromCache': False, "ExecutionRootDir": f"s3://bioos-{workspace_id}" }, } resp = bioos_service.create_submission(params_create_submission) print(resp)
在Bio-OS的投递任务中主要分为三个层级: 投递(submission)-运行(Run)-Task(任务)
一次投递中可以用一个或批量的运行任务,而一个任务则可以由多个task组成。因此需要查询某个运行(run)时,需要传入这次运行所属的SubmissionID,当需要获取具体Task的运行状态时,则需要传入task所属的RunID,示例如下:
# coding:utf-8 from __future__ import print_function from volcengine.bioos.BioOsService import BioOsService if __name__ == '__main__': # set endpoint/region here if the default value is unsatisfied bioos_service = BioOsService(endpoint='https://open.volcengineapi.com', region='cn-beijing') # call below method if you don't set ak and sk in $HOME/.volc/config bioos_service.set_ak('输入AK') bioos_service.set_sk('输入SK') workspace_name = "test_workspace" #获取Workspace ID params_list_workspace = { 'Filter':{ 'Keyword':workspace_name } } workspace_id = bioos_service.list_workspaces(params_list_workspace)['Items'][0]["ID"] print("workspace_id:",workspace_id) #获取submission ID params_list_submission = { 'WorkspaceID': workspace_id, } submissionID = bioos_service.list_submissions(params_list_submission)['Items'][0]["ID"] print("submission_id:",submissionID) #获取Run ID params_list_run = { 'WorkspaceID': workspace_id, 'SubmissionID': submissionID } RunID = bioos_service.list_runs(params_list_run) print("run_id:",RunID)
返回如下所示
run_id: {'Items': [{'ID': 'rcihbi2teig4bed1hheig', 'Status': 'Succeeded', 'StartTime': 1688385804, 'FinishTime': 1688386130, 'Duration': 326, 'SubmissionID': 'scihbi2teig4bed1hhei0', 'EngineRunID': 'ba3c2a91-2022-45af-b50e-feecb491d2f3', 'Inputs': '{"CramToBamFlow.gotc_docker":"biocontainers/samtools:v1.7.0_cv4","CramToBamFlow.input_cram":"s3://bioos-wcidtv1leig4am39n9lag/NA12878.cram","CramToBamFlow.ref_dict":"s3://bioos-wcidtv1leig4am39n9lag/Homo_sapiens_assembly38.dict","CramToBamFlow.ref_fasta":"s3://bioos-wcidtv1leig4am39n9lag/Homo_sapiens_assembly38.fasta","CramToBamFlow.ref_fasta_index":"s3://bioos-wcidtv1leig4am39n9lag/Homo_sapiens_assembly38.fasta.fai","CramToBamFlow.sample_name":"test"}', 'Outputs': '{"CramToBamFlow.outputBai":"s3://bioos-wcih94qdeig4am39nf8ug/analysis/scihbi2teig4bed1hhei0/CramToBamFlow/ba3c2a91-2022-45af-b50e-feecb491d2f3/call-CramToBamTask/execution/test.bai","CramToBamFlow.outputBam":"s3://bioos-wcih94qdeig4am39nf8ug/analysis/scihbi2teig4bed1hhei0/CramToBamFlow/ba3c2a91-2022-45af-b50e-feecb491d2f3/call-CramToBamTask/execution/test.bam"}', 'TaskStatus': {'Count': 1, 'Succeeded': 1, 'Failed': 0, 'Running': 0, 'Queued': 0, 'Initializing': 0, 'Cancelled': 0}, 'Log': 's3://bioos-wcih94qdeig4am39nf8ug/analysis/scihbi2teig4bed1hhei0/workflow.ba3c2a91-2022-45af-b50e-feecb491d2f3.log'}], 'PageNumber': 1, 'PageSize': 10, 'TotalCount': 1}