Python Query SDK 帮助 Serverless Spark 用户更加轻松地通过 Python 语言使用 Serverless Spark 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便参考使用。
目前 Serverless Spark 支持的地域和 API 域名如下表所示:
Region(中文名称) | Region | Endpoint |
---|---|---|
华北-北京 | cn-beijing | open.volcengineapi.com |
华东-上海 | cn-shanghai | open.volcengineapi.com |
华北-广州 | cn-guangzhou | open.volcengineapi.com |
亚太东南-柔佛 | ap-southeast-1 | open.volcengineapi.com |
Python 3.6+安装包如下:
# 解压安装包,用户需输入实际安装包名称 $ unzip python_serverless-1.0.1-py3-none-any.whl.zip # 安装SDK,用户需输入实际安装包名称 $ pip3 install python_serverless-1.0.1-py3-none-any.whl
Python Query SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:
from serverless.auth import StaticCredentials from serverless.client import ServerlessClient ak = 'your ak' sk = 'your sk' region = 'cn-beijing' endpoint = 'open.volcengineapi.com' service = 'emr_serverless' connection_timeout = 30 socket_timeout = 30 client = ServerlessClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint, service=service, connection_timeout=connection_timeout, socket_timeout=socket_timeout)
ServerlessClient
客户端是后续调用各种 Serverless Spark 功能的入口,当前版本 ServerlessClient 提供如下 API 接口:
API | 功能 |
---|---|
execute | 执行作业 |
cancel_job | 取消任务 |
get_job | 获取任务实例状态 |
get_result | 获取作业结果 |
初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:
sql = """ SELECT * FROM `${your_schema}`.`${your_table}` LIMIT 100 """ # 同步执行查询 job = client.execute(task=SQLTask(name="first query task", query=sql, conf={}), is_sync=True) # 获取查询结果 if job.is_success(): result = job.get_result() for record in result: print(', '.join([col for col in record]))
本节将以代码示例的形式展示更多 Serverless Spark 功能的使用方式。
SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下参数:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
query | str | Y | sql 语句 |
name | str | N | 任务名 说明 如果不指定会以 SQLTask_${current_timestamp} 的方式生成 |
conf | dict | N | 用于指定任务参数,默认为空 |
queue | str | N | 指定运行队列名,不填则将选用公共队列 |
示例:
def execute_sql_task(): from serverless.client import ServerlessClient from serverless.auth import StaticCredentials from serverless.task import SQLTask from serverless.exceptions import QuerySdkError ak = 'xxxx' sk = 'xxxx' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' sql = '${CUSTOM_SQL_STATEMEMT}' client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region,endpoint=endpoint) try: job = client.execute(task=SQLTask(name='sql task', query=sql, conf={ # 计算组名,可选,默认选用Default计算组 #"serverless.compute.group.name": "xxx" }), is_sync=True) if job.is_success(): result = job.get_result() for record in result: print(', '.join([col for col in record])) except QuerySdkError as e: print( "Error in executing sql task. code = %s, error = %s" % ( e.error_code, e.info)) if __name__ == "__main__": execute_sql_task()
SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业开发指南 文档。
JarTask 是 SDK 提供 SparkJar 任务执行的接口:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
jar | str | Y | 任务执行时使用的 SparkJar 资源,需传入tos路径,例如:['tos://bucket/path/to/jar'] |
main_class | str | Y | Spark application 的 main class |
main_args | list | N | spark application 的 main function 参数;不传默认为 empty list |
name | str | N | 任务名;如果不指定会以 |
conf | dict | N | 用于指定任务参数,默认为空 |
queue | str | N | 指定运行队列名,不填则将选用公共队列 |
depend_jars | []str | N | 依赖的jar文件,对应spark-submit的--jars选项,例如:['tos://bucket/path/to/jar'] |
files | []str | N | 依赖的文件,对应spark-submit的--files选项, ,例如:['tos://bucket/path/to/file'] |
archives | []str | N | 依赖的archive文件,对应spark-submit的--archieves选项, ,例如:['tos://bucket/path/to/archive'] |
示例:
def execute_spark_task(): from serverless.client import ServerlessClient from serverless.auth import StaticCredentials from serverless.task import JarTask ak = 'xxxx' sk = 'xxxx' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) jar_resource = 'tos://bucket/path/to/jar' main_class = 'com.xxx.xxx' main_args = ['arg_xxx', 'arg_yyy'] job = client.execute(task=JarTask(name="spark task test", jar=jar_resource, main_class=main_class, main_args=main_args, conf={'serverless.spark.access.key': '${ak}', 'serverless.spark.secret.key': '${sk}'} ), is_sync=True) print('The task executed successfully.') print('Tracking ui: %s' % job.get_tracking_url()) if __name__ == "__main__": execute_spark_task()
Pyspark 任务为用户提供了通过编写 Python Spark 应用进行定制化数据分析需求的支持。详见 PySpark作业开发指南。
PySparkTask 是 SDK 提供 PySpark 任务执行的接口:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
script | str | Y | 任务执行时使用的 SparkJar 资源,需传入tos路径,例如:['tos://bucket/path/to/pyfile'] |
args | list | N | spark application 的 main function 参数,默认为 empty list |
name | str | N | 任务名。如果不指定会以 |
conf | dict | N | 用于指定任务参数,默认为空 |
queue | str | N | 指定运行队列名,不填则将选用公共队列 |
depend_jars | []str | N | 依赖的jar文件,对应spark-submit的--jars选项,例如:['tos://bucket/path/to/jar'] |
files | []str | N | 依赖的文件,对应spark-submit的--files选项, ,例如:['tos://bucket/path/to/file'] |
archives | []str | N | 依赖的archive文件,对应spark-submit的--archieves选项, ,例如:['tos://bucket/path/to/archive'] |
pyfiles | []str | N | 依赖的pyfile文件,对应spark-submit的--pyfiles选项,,例如:['tos://bucket/path/to/pyfile'] |
示例:
def execute_pyspark_task(): from serverless.client import ServerlessClient from serverless.auth import StaticCredentials from serverless.task import PySparkTask ak = 'xxxx' sk = 'xxxx' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) pyscript_resource = 'tos://bucket/path/to/pyfile' main_args = ['arg_xxx', 'arg_yyy'] files = ['tos://bucket/path/to/dependency_pyfile'] task = PySparkTask(name='pyspark task test', # queue='${queue_name}', script=pyscript_resource, args=main_args, conf={'serverless.spark.access.key': '${tos_ak}', 'serverless.spark.secret.key': '${tos_sk}'}, files=files ) job = client.execute(task=task, is_sync=True) print('The task executed successfully.') print('Tracking ui: %s' % job.get_tracking_url()) if __name__ == "__main__": execute_pyspark_task()
from serverless import RayJobTask _job = _client.execute(task=RayJobTask(name="test rayjob task", conf={ # 计算组名,可选,默认选用Default计算组 #"serverless.compute.group.name": "xxx", # 如需使用自定义镜像,添加该配置,替换镜像地址 #"serverless.ray.version":"2.22.0", #"serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:2.30.0-py3.11-ubuntu20.04-240" }, head_cpu='4', head_memory='16Gi', worker_cpu='4', worker_memory='16Gi', worker_replicas=2, # entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名} # 其中 /home/ray/workdir 目前为固定值 entrypoint_cmd='python /home/ray/workdir/rayzip.py', # tos 文件路径 entrypoint_resource='tos://wbw-dev/rayunzip/rayzip.py.zip', runtime_env={ # 需要runtime引入的pip包 "pip": ["requests==2.26.0", "pendulum==2.1.2"], "env_vars": { "counter_name": "test_counter" } # queue=xxx } ), is_sync=False)
参数说明:
参数 | 描述 |
---|---|
name | 作业名称 |
head_cpu & head_memory | 设置Ray head node的cpu和memory,格式与k8s resource一致,例如 |
worker_cpu & worker_memory | 设置Ray worker node的cpu和memory,格式与k8s resource一致,例如 |
worker_replicas | 设置Ray worker node的节点个数 |
entrypoint_cmd | 用于运行作业的entrypoint shell command |
entrypoint_resource | 用于运行作业的相关资源文件,目前支持传入一个tos路径,作业执行时将会对该文件进行解压 |
runtime_env | 设置Ray作业的运行时环境,可用于指定环境变量/工作目录/pip安装依赖 |
is_sync | 设置作业是否是同步执行;设置为true,则execute函数将会同步阻塞直至作业结束,设置为false,则execute函数在任务提交后将会立即返回 |
通过is_sync 参数进行控制:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
task | Task | Y | 需要执行的任务 |
is_sync | bool | Y | 是否同步执行 |
timeout | int | N | 同步执行的超时时间,单位 :s |
# 异步 client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=False) # 同步执行,3 分钟超时 client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=True, timeout=180)
# 取消任务可以通过 job 实例;也可以通过 ServerlessClient 进行取消 job.cancel() client.cancel_job(job)
可以根据任务 ID 进行任务实例的获取:
job = client.get_job(jobId)
从拿到的任务实例获取任务对应的 Spark UI 页面:
job.get_tracking_url()
_job = _client.get_job('${job_id}') _log_cursor = _client.get_driver_log(_job) while _log_cursor.has_next(): _log_cursor.fetch_next_page() current_rows = _log_cursor.current_rows for log_entry in current_rows: print(log_entry)
# 查看提交日志 _log_cursor = _client.get_submission_log(_submission_failed_job) while _log_cursor.has_next(): _log_cursor.fetch_next_page() current_rows = _log_cursor.current_rows for log_entry in current_rows: print(log_entry)
异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 wait_for()
函数:
# 等待任务结束 job.wait_for_finished() # 自定义结束状态 def when_to_exit() -> bool: return job.get_tracking_url() is not None job.wait_for(when_to_exit=_when_to_exit, timeout=180)
对job实例调用 get_result()
获取任务的查询结果:
result = job.get_result() # 或者由 serverless client 侧获取 result = client.get_result(job) for record in result: print("row: (%s, %s, %s)" % (record[0], record[1], record[2]))
任务异常将会以 QuerySdkError 的形式进行抛出,exception message 内携带具体的执行错误信息。