You need to enable JavaScript to run this app.
导航
Python Query SDK
最近更新时间:2025.02.06 11:55:52首次发布时间:2024.05.16 21:00:28

简介

Python Query SDK 帮助 Serverless Spark 用户更加轻松地通过 Python 语言使用 Serverless Spark 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便参考使用。

概念说明

系统概念

  • Endpoint:表示 Serverless Spark 对外服务的 API 域名
  • Region:表示 Serverless Spark 的数据中心所在的物理区域

目前 Serverless Spark 支持的地域和 API 域名如下表所示:

Region(中文名称)

Region

Endpoint

华北-北京

cn-beijing

open.volcengineapi.com

华东-上海

cn-shanghai

open.volcengineapi.com

华北-广州

cn-guangzhou

open.volcengineapi.com

亚太东南-柔佛

ap-southeast-1

open.volcengineapi.com

  • Access Key / Secret Access Key:访问火山引擎 API 的密钥;用户可以通过火山引擎的“密钥管理”页面获取到 Access Key 和 Secret Access Key。

内部概念

  • Schema:一个包含数据表、资源、UDF 等信息的集合空间概念。
  • Task:定义某次任务的执行信息,包括 查询 SQL、执行方式(同步/异步)、任务名、参数等信息。
  • Job:表示某次 Task 执行生成的任务实例。
  • Result:表示某次 Job 的运行结果。
  • ResultSchema:运行结果的 Schema 信息。
  • Record:表示运行结果的结果集中的一行记录。

安装 SDK

Python 3.6+安装包如下:

python_serverless-1.0.1-py3-none-any .whl
未知大小

直接使用 wheel 安装:

# 解压安装包,用户需输入实际安装包名称
$ unzip python_serverless-1.0.1-py3-none-any.whl.zip 
# 安装SDK,用户需输入实际安装包名称
$ pip3 install python_serverless-1.0.1-py3-none-any.whl

快速入门

初始化客户端

Python Query SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:

from serverless.auth import StaticCredentials
from serverless.client import ServerlessClient

ak = 'your ak'
sk = 'your sk'
region = 'cn-beijing'
endpoint = 'open.volcengineapi.com'
service = 'emr_serverless'
connection_timeout = 30
socket_timeout = 30

client = ServerlessClient(credentials=StaticCredentials(ak, sk), 
    region=region, endpoint=endpoint, service=service,
    connection_timeout=connection_timeout, 
    socket_timeout=socket_timeout)

ServerlessClient 客户端是后续调用各种 Serverless Spark 功能的入口,当前版本 ServerlessClient 提供如下 API 接口:

API

功能

execute

执行作业

cancel_job

取消任务

get_job

获取任务实例状态

get_result

获取作业结果

第一个查询

初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:

sql = """
    SELECT * FROM `${your_schema}`.`${your_table}` LIMIT 100
"""

# 同步执行查询
job = client.execute(task=SQLTask(name="first query task", 
                            query=sql,
                            conf={}),
                    is_sync=True)

# 获取查询结果
if job.is_success():
    result = job.get_result()
    for record in result:
        print(', '.join([col for col in record]))

作业运行&取消

本节将以代码示例的形式展示更多 Serverless Spark 功能的使用方式。

提交 SQL 任务

SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下参数:

参数

类型

是否必须

描述

query

str

Y

sql 语句

name

str

N

任务名

说明

如果不指定会以 SQLTask_${current_timestamp} 的方式生成

conf

dict

N

用于指定任务参数,默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

示例:

def execute_sql_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import SQLTask
    from serverless.exceptions import QuerySdkError

    ak = 'xxxx'
    sk = 'xxxx'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'open.volcengineapi.com'
    sql = '${CUSTOM_SQL_STATEMEMT}'

    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region,endpoint=endpoint)

    try:
        job = client.execute(task=SQLTask(name='sql task', query=sql, 
        conf={
              # 计算组名,可选,默认选用Default计算组
              #"serverless.compute.group.name": "xxx"
        }), is_sync=True)
        if job.is_success():
            result = job.get_result()
            for record in result:
                print(', '.join([col for col in record]))

    except QuerySdkError as e:
        print(
            "Error in executing sql task. code = %s, error = %s" % (
            e.error_code, e.info))


if __name__ == "__main__":
    execute_sql_task()

提交 SparkJar 任务

SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业开发指南 文档。
JarTask 是 SDK 提供 SparkJar 任务执行的接口:

参数

类型

是否必须

描述

jar

str

Y

任务执行时使用的 SparkJar 资源,需传入tos路径,例如:['tos://bucket/path/to/jar']

main_class

str

Y

Spark application 的 main class

main_args

list

N

spark application 的 main function 参数;不传默认为 empty list

name

str

N

任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成

conf

dict

N

用于指定任务参数,默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

depend_jars

[]str

N

依赖的jar文件,对应spark-submit的--jars选项,例如:['tos://bucket/path/to/jar']

files

[]str

N

依赖的文件,对应spark-submit的--files选项, ,例如:['tos://bucket/path/to/file']

archives

[]str

N

依赖的archive文件,对应spark-submit的--archieves选项, ,例如:['tos://bucket/path/to/archive']

示例:

def execute_spark_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import JarTask

    ak = 'xxxx'
    sk = 'xxxx'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'open.volcengineapi.com'

    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)

    jar_resource = 'tos://bucket/path/to/jar'
    main_class = 'com.xxx.xxx'
    main_args = ['arg_xxx', 'arg_yyy']
    job = client.execute(task=JarTask(name="spark task test",
        jar=jar_resource,
        main_class=main_class,
        main_args=main_args,
        conf={'serverless.spark.access.key': '${ak}', 'serverless.spark.secret.key': '${sk}'}
),
    is_sync=True)

    print('The task executed successfully.')
    print('Tracking ui: %s' % job.get_tracking_url())


if __name__ == "__main__":
    execute_spark_task()

提交 PySpark 任务

Pyspark 任务为用户提供了通过编写 Python Spark 应用进行定制化数据分析需求的支持。详见 PySpark作业开发指南
PySparkTask 是 SDK 提供 PySpark 任务执行的接口:

参数

类型

是否必须

描述

script

str

Y

任务执行时使用的 SparkJar 资源,需传入tos路径,例如:['tos://bucket/path/to/pyfile']

args

list

N

spark application 的 main function 参数,默认为 empty list

name

str

N

任务名。如果不指定会以 SparkJarTask_${current_time} 的方式生成

conf

dict

N

用于指定任务参数,默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

depend_jars

[]str

N

依赖的jar文件,对应spark-submit的--jars选项,例如:['tos://bucket/path/to/jar']

files

[]str

N

依赖的文件,对应spark-submit的--files选项, ,例如:['tos://bucket/path/to/file']

archives

[]str

N

依赖的archive文件,对应spark-submit的--archieves选项, ,例如:['tos://bucket/path/to/archive']

pyfiles

[]str

N

依赖的pyfile文件,对应spark-submit的--pyfiles选项,,例如:['tos://bucket/path/to/pyfile']

示例:

def execute_pyspark_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import PySparkTask

    ak = 'xxxx'
    sk = 'xxxx'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'open.volcengineapi.com'

    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)

    pyscript_resource = 'tos://bucket/path/to/pyfile'
    main_args = ['arg_xxx', 'arg_yyy']
    files = ['tos://bucket/path/to/dependency_pyfile']
    task = PySparkTask(name='pyspark task test',
        # queue='${queue_name}',
        script=pyscript_resource,
        args=main_args,
        conf={'serverless.spark.access.key': '${tos_ak}',
            'serverless.spark.secret.key': '${tos_sk}'},
        files=files
    )
        
    job = client.execute(task=task, is_sync=True)

    print('The task executed successfully.')
    print('Tracking ui: %s' % job.get_tracking_url())


if __name__ == "__main__":
    execute_pyspark_task()

提交Ray作业

from serverless import RayJobTask

_job = _client.execute(task=RayJobTask(name="test rayjob task",
        conf={
             # 计算组名,可选,默认选用Default计算组
             #"serverless.compute.group.name": "xxx",
             # 如需使用自定义镜像,添加该配置,替换镜像地址
             #"serverless.ray.version":"2.22.0",
            #"serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:2.30.0-py3.11-ubuntu20.04-240"
                                         
        },
        head_cpu='4',
        head_memory='16Gi',
        worker_cpu='4',
        worker_memory='16Gi',
        worker_replicas=2,
        # entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名}
        # 其中 /home/ray/workdir 目前为固定值
        entrypoint_cmd='python /home/ray/workdir/rayzip.py',
        # tos 文件路径
        entrypoint_resource='tos://wbw-dev/rayunzip/rayzip.py.zip',
        runtime_env={
            # 需要runtime引入的pip包
            "pip": ["requests==2.26.0", "pendulum==2.1.2"],
            "env_vars": {
                "counter_name": "test_counter"
            }
        # queue=xxx
        }
    ),
        is_sync=False)

参数说明:

参数

描述

name

作业名称

head_cpu & head_memory

设置Ray head node的cpu和memory,格式与k8s resource一致,例如 4Gi

worker_cpu & worker_memory

设置Ray worker node的cpu和memory,格式与k8s resource一致,例如 4Gi

worker_replicas

设置Ray worker node的节点个数

entrypoint_cmd

用于运行作业的entrypoint shell command

entrypoint_resource

用于运行作业的相关资源文件,目前支持传入一个tos路径,作业执行时将会对该文件进行解压

runtime_env

设置Ray作业的运行时环境,可用于指定环境变量/工作目录/pip安装依赖

is_sync

设置作业是否是同步执行;设置为true,则execute函数将会同步阻塞直至作业结束,设置为false,则execute函数在任务提交后将会立即返回

同步/异步执行

通过is_sync 参数进行控制:

参数

类型

是否必须

描述

task

Task

Y

需要执行的任务

is_sync

bool

Y

是否同步执行

timeout

int

N

同步执行的超时时间,单位 :s

# 异步
client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=False)

# 同步执行,3 分钟超时
client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=True, timeout=180)

取消任务

# 取消任务可以通过 job 实例;也可以通过 ServerlessClient 进行取消
job.cancel()
client.cancel_job(job)

查看任务实例相关信息

获取任务实例

可以根据任务 ID 进行任务实例的获取:

job = client.get_job(jobId)

获取引擎侧任务执行 UI

从拿到的任务实例获取任务对应的 Spark UI 页面:

job.get_tracking_url()

查看执行日志

_job = _client.get_job('${job_id}')
_log_cursor = _client.get_driver_log(_job)
while _log_cursor.has_next():
    _log_cursor.fetch_next_page()
    current_rows = _log_cursor.current_rows
    for log_entry in current_rows:
        print(log_entry)

查询提交日志

# 查看提交日志
_log_cursor = _client.get_submission_log(_submission_failed_job)
while _log_cursor.has_next():
    _log_cursor.fetch_next_page()
    current_rows = _log_cursor.current_rows
    for log_entry in current_rows:
        print(log_entry)

等待任务

异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 wait_for() 函数:

# 等待任务结束
job.wait_for_finished()

# 自定义结束状态
def when_to_exit() -> bool:
    return job.get_tracking_url() is not None

job.wait_for(when_to_exit=_when_to_exit, timeout=180)

获取查询结果

对job实例调用 get_result() 获取任务的查询结果:

result = job.get_result()

# 或者由 serverless client 侧获取
result = client.get_result(job)

for record in result:
    print("row: (%s, %s, %s)" % (record[0], record[1], record[2]))

7 执行异常

任务异常将会以 QuerySdkError 的形式进行抛出,exception message 内携带具体的执行错误信息。