在使用Python SDK前,需要您本地通过wheel文件,安装SDK所需的依赖包。
安装包:
pip3 install python_serverless-1.0.1-py3-none-any.whl
Pyspark 任务为用户提供了通过编写 Python Spark 应用进行定制化数据分析需求的支持。详见 PySpark作业开发指南。
PySparkTask 是 SDK 提供 PySpark 任务执行的接口:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
script | str | Y | 任务执行时使用的 SparkJar 资源,需传入tos路径,例如:['tos://bucket/path/to/pyfile'] |
args | list | N | spark application 的 main function 参数,默认为 empty list |
name | str | N | 任务名,如果不指定会以 SparkJarTask_${current_time} 的方式生成 |
conf | dict | N | 用于指定任务参数,默认为空 |
queue | str | N | 指定运行队列名,不填则将选用公共队列 |
depend_jars | [ ]str | N | 依赖的jar文件,对应spark-submit的--jars选项,例如:['tos://bucket/path/to/jar'] |
files | [ ]str | N | 依赖的文件,对应spark-submit的--files选项,例如:['tos://bucket/path/to/file'] |
archives | [ ]str | N | 依赖的archive文件,对应spark-submit的--archieves选项, ,例如:['tos://bucket/path/to/archive'] |
pyfiles | [ ]str | N | 依赖的pyfile文件,对应spark-submit的--pyfiles选项,,例如:['tos://bucket/path/to/pyfile'] |
cn-beijing
cn-shanghai
cn-guangzhou
ap-southeast-1
属性 | 属性名称 | 描述 | 默认值 |
---|---|---|---|
Driver 相关 |
| Driver 堆内内存大小 | 12g |
| Driver 堆外内存大小 | 4g | |
| Driver 程序的 CPU 核心数 | 4 | |
| Driver 返回结果的最大大小 | 3g | |
Executor 相关 |
| 每个 Executor 的 CPU 核心数 | 4 |
| 每个 Executor 的内存大小 | 12g | |
| 每个 Executor 的堆外内存大小 | 4g | |
| Executor 实例数(默认开启了dynamic,无需设置此参数) | 1 | |
dynamic相关 |
| 是否开启dynamic | true |
| 最少Executor个数 | 1 | |
| 最大Executor个数 | 30 | |
并行度和任务调度 |
| Map 单个task的文件大小 | 268435456 |
| Shuffle 操作时生成的分区数 | 200 |
from pyspark.sql import SparkSession # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() spark.sql("select 1").show() spark.stop()
from serverless.task import PySparkTask from serverless.auth import StaticCredentials from serverless.client import ServerlessClient from serverless.enums import JobStatus from serverless.exceptions import QuerySdkError import time ak = '<此处填写用户实际的 AK>' sk = '<此处填写用户实际的 SK>' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' def submit(): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) pyscript_resource = 'tos://emr-qa/lei/demo/spark/pyspark_demo_sql.py' # 自定义参数 main_args = [] task = PySparkTask(name='lei pyspark task test', script=pyscript_resource, args=main_args, conf={'serverless.spark.access.key': ak, 'serverless.spark.secret.key': sk, # 使用自定义资源,默认driver和executor是4c/16g # 'spark.driver.cores': '4', # 'spark.driver.maxResultSize': '100g', # 'spark.driver.memory': '12g', # 'spark.driver.memoryOverhead': '4g', # 'spark.executor.cores': '4', # 'spark.executor.memory': '12g', # 'spark.executor.memoryOverhead': '4g', # 使用自定义镜像 'las.cluster.type': 'vke', 'spark.kubernetes.container.image': 'emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-spark/spark:online-SQL-1.0.0.103-20240627105213' } #queue=xxx;不填写默认是公共队列 ) print('start submit task') job = client.execute(task=task, is_sync=False) while not JobStatus.is_finished(job.status): job = client.get_job(job.id) print('Id: %s, Status: %s' % (job.id, job.status)) try: print('Tracking ui: %s' % job.get_tracking_url()) except QuerySdkError: pass time.sleep(3) print('The task executed successfully!!!') print('Tracking ui: %s' % job.get_tracking_url()) print('Result ui: %s' % job.get_result_url()) def get_conf(): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) job = client.get_job(277548274) client.cancel_job(job) print('start_time: %s' % job.__getattribute__("start_time")) print('end_time: %s' % job.__getattribute__("end_time")) print('status: %s' % job.__getattribute__("status")) print('queue_name: %s' % job.__getattribute__("queue_name")) print('conf: %s' % job.__getattribute__("conf")) print('tracking_url: %s' % job.get_tracking_url()) def cancel_job(): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) client.cancel_job(client.get_job(277548274)) if __name__ == "__main__": submit()
import argparse from pyspark.sql import SparkSession from pyspark import SparkFiles # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() parser = argparse.ArgumentParser(description='这是一个示例程序') parser.add_argument('--name', help='name') parser.add_argument('--inputFile', help='输入文件') parser.add_argument('--archiveName', help='输出文件') parser.add_argument('--archiveFile', help='输出文件路径') args = parser.parse_args() inputFile = args.inputFile name = args.name archiveName = args.archiveName archiveFile = args.archiveFile # 打印参数的值 print('name输入文件路径:', name) print('输入文件路径:', inputFile) print('输出文件路径:', archiveName) print('显示详细信息:', archiveFile) print('开始输出文件内容:', inputFile) file_df = spark.sparkContext.textFile(inputFile) file_df_data = file_df.collect() SparkFiles.get(inputFile) print("inputFile data:") for line in file_df_data: print(line) # 创建示例数据 data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)] df = spark.createDataFrame(data, ["Name", "Age"]) # 将DataFrame保存为CSV文件 output_path = "tos://emr-qa/lei/lei_db/people_table.csv" df.write.csv(output_path) spark.stop()
from serverless.task import PySparkTask from serverless.auth import StaticCredentials from serverless.client import ServerlessClient from serverless.enums import JobStatus from serverless.exceptions import QuerySdkError import time ak = '<此处填写用户实际的 AK>' sk = '<此处填写用户实际的 SK>' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' def submit(): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) pyscript_resource = 'tos://emr-qa/lei/demo/spark/pyspark_demo_sql.py' # 自定义参数 main_args = ['--inputFile', 'rel.txt', '--archiveName', 'archives_data.zip', '--archiveFile', 'rel.txt'] # 依赖的文件,对应spark-submit的--files选项 files = ['tos://emr-qa/lei/demo/spark/rel.txt'] # 依赖的archive文件,对应spark-submit的--archieves选项, archives = ['tos://emr-qa/lei/demo/spark/archives_data.zip'] # 依赖的pyfile文件,对应spark-submit的--pyfiles选项 pyfiles = ['tos://emr-qa/lei/demo/spark/basic_statistics.zip'] task = PySparkTask(name='lei pyspark task test', script=pyscript_resource, args=main_args, conf={'serverless.spark.access.key': ak, 'serverless.spark.secret.key': sk, # 使用自定义镜像 'las.cluster.type': 'vke', 'spark.kubernetes.container.image': 'emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-spark/spark:online-SQL-1.0.0.103-20240627105213' }, files=files, archives=archives, pyfiles=pyfiles #queue= ) print('start submit task') job = client.execute(task=task, is_sync=False) while not JobStatus.is_finished(job.status): job = client.get_job(job.id) print('Id: %s, Status: %s' % (job.id, job.status)) try: print('Tracking ui: %s' % job.get_tracking_url()) except QuerySdkError: pass time.sleep(3) print('The task executed successfully!!!') print('Tracking ui: %s' % job.get_tracking_url()) print('Result ui: %s' % job.get_result_url()) def get_conf(job_id: str): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) job = client.get_job(job_id) client.cancel_job(job) print('start_time: %s' % job.__getattribute__("start_time")) print('end_time: %s' % job.__getattribute__("end_time")) print('status: %s' % job.__getattribute__("status")) print('queue_name: %s' % job.__getattribute__("queue_name")) print('conf: %s' % job.__getattribute__("conf")) print('tracking_url: %s' % job.get_tracking_url()) def cancel_job(job_id: str): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) client.cancel_job(client.get_job(job_id)) if __name__ == "__main__": submit()
PySparkTask
的conf
参数。{ 'serverless.spark.access.key': 'xx', 'serverless.spark.secret.key': 'xx', 'las.cluster.type': 'vke', 'spark.kubernetes.container.image': 'emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-spark/spark:online-SQL-1.0.0.103-20240708105213' }
{ 'serverless.spark.access.key': 'xx', 'serverless.spark.secret.key': 'xx', 'las.spark.jar.depend.jars': '[{\"fileName\": \"tos://lei-las-formation/pyspark/spark-jar-demo-1.0-SNAPSHOT.jar\"}]' }
import time from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType from basic_statistics.file_info import getFile1 from basic_statistics.file_info import get_all_files from tools import func from module1 import my_module1 from module2 import my_module2 # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() get_all_files("./") getFile1("/") taxCut = udf(lambda salary: func.tax(salary), FloatType()) staff_data = [ {"name": "Alice", "salary": 70000}, {"name": "Bob", "salary": 60000}, {"name": "Charlie", "salary": 50000}, {"name": "David", "salary": 80000}, {"name": "Eve", "salary": 75000}, {"name": "Frank", "salary": 40000}, {"name": "Grace", "salary": 95000}, {"name": "Hannah", "salary": 30000}, ] df = spark.createDataFrame(staff_data) taxCut = udf(lambda salary: func.tax(salary), FloatType()) df.select("name", taxCut("salary").alias("final salary")).show() # 创建 DataFrame data = [(1,), (2,), (3,), (4,)] df = spark.createDataFrame(data, ["value"]) # 使用模块中的函数 result_multiply = df.rdd.map(lambda x: my_module1.multiply(x[0], 5)).collect() result_add = df.rdd.map(lambda x: my_module2.add(x[0], 10)).collect() time.sleep(360) spark.stop()
from serverless.task import PySparkTask from serverless.auth import StaticCredentials from serverless.client import ServerlessClient from serverless.enums import JobStatus from serverless.exceptions import QuerySdkError import time ak = '<此处填写用户实际的 AK>' sk = '<此处填写用户实际的 SK>' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' def submit(): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) pyscript_resource = 'tos://emr-qa/lei/demo/spark/pyspark_demo_module.py' # 自定义参数 main_args = [] pyfiles = ['tos://emr-qa/lei/demo/spark/basic_statistics.zip', 'tos://emr-qa/lei/demo/spark/tools.zip', 'tos://emr-qa/lei/demo/spark/module1.zip', 'tos://emr-qa/lei/demo/spark/module2.zip'] task = PySparkTask(name='lei pyspark task test', script=pyscript_resource, args=main_args, conf={'serverless.spark.access.key': ak, 'serverless.spark.secret.key': sk, 'spark.executorEnv.PYTHONPATH': '/opt/spark/work-dir/module1.zip:/opt/spark/work-dir/module2.zip:/opt/spark/work-dir/basic_statistics.zip:/opt/spark/work-dir/tools.zip', }, pyfiles=pyfiles ) print('start submit task') job = client.execute(task=task, is_sync=False) while not JobStatus.is_finished(job.status): job = client.get_job(job.id) print('Id: %s, Status: %s' % (job.id, job.status)) try: print('Tracking ui: %s' % job.get_tracking_url()) except QuerySdkError: pass time.sleep(3) print('The task executed successfully!!!') print('Tracking ui: %s' % job.get_tracking_url()) print('Result ui: %s' % job.get_result_url()) def get_conf(job_id: str): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) job = client.get_job(job_id) client.cancel_job(job) print('start_time: %s' % job.__getattribute__("start_time")) print('end_time: %s' % job.__getattribute__("end_time")) print('status: %s' % job.__getattribute__("status")) print('queue_name: %s' % job.__getattribute__("queue_name")) print('conf: %s' % job.__getattribute__("conf")) print('tracking_url: %s' % job.get_tracking_url()) def cancel_job(job_id: str): client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) client.cancel_job(client.get_job(job_id)) if __name__ == "__main__": submit()
说明
用户需将命令中的TOS文件路径替换成实际TOS文件路径。
PySparkTask
的conf
参数。{ 'serverless.spark.access.key': 'xx', 'serverless.spark.secret.key': 'xx', 'spark.unpackUseCommand.enabled': 'true', 'las.spark.jar.depend.archives': '[{\"fileName\": \"tos://emr-qa/lei/demo/spark/python38_new.zip\"}]', 'spark.pyspark.python': 'python38_new.zip/python38_new/bin/python3', 'spark.pyspark.driver.python': 'python38_new/python38_new/bin/python3' }
说明
用户需将命令中的TOS文件路径替换成实际TOS文件路径。
# 安装 wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh sh Miniconda3-latest-Linux-x86_64.sh # 环境变量 vi .bashrc export PATH=" /root/miniconda3/bin:$PATH" source .bashrc
# 构建python3.8环境 conda create --name python38_new python=3.8 source activate python38_new pip3 install pandas pip3 install scipy pip3 install scikit-learn # 打成压缩包 which python3 cd /root/miniconda/python38_new/env zip -r python38_new.zip python38_new/
task = PySparkTask(name='pyspark hms demo', script=pyscript_resource, args=main_args, conf={'serverless.spark.access.key': ak, 'serverless.spark.secret.key': sk, 'las.cross.vpc.access.enabled':'true', # 火山-租户ID 'las.cross.vpc.accountId':'xxx', # 火山-HMS-私有网络ID 'las.cross.vpc.vpc.id':'xxx', # 火山-HMS-私有网络-子网ID 'las.cross.vpc.subnet.id':'xxx', # 火山-HMS-私有网络-安全组ID 'las.cross.vpc.security.group.id':'xxx'} )
task = PySparkTask(name='pyspark es demo', script=pyscript_resource, args=main_args, depend_jars=['tos://emr-qa/lei/demo/spark/elasticsearch-spark-30_2.12-8.13.2.jar'], conf={'serverless.spark.access.key': ak, 'serverless.spark.secret.key': sk, 'las.cluster.type':'vke', 'las.cross.vpc.access.enabled':'true', # 火山-租户ID 'las.cross.vpc.accountId':'xxx', # 火山-ES-私有网络ID 'las.cross.vpc.vpc.id':'xxx', # 火山-ES-私有网络-子网ID 'las.cross.vpc.subnet.id':'xxx', # 火山-ES-私有网络-安全组ID 'las.cross.vpc.security.group.id':'xxx'} )
说明
用户需将命令中的TOS文件路径替换成实际TOS文件路径。
{ 'las.cluster.type': 'vke', 'las.cross.vpc.access.enabled': 'true', # 火山-用户-VPCId 'las.cross.vpc.vpc.id': 'vpc-xxx', # 火山-用户-子网ID 'las.cross.vpc.subnet.id': 'subnet-xxx', #火山-用户VPC安全组ID 'las.cross.vpc.security.group.id': 'sg-xxx', #火山-用户的租户ID 'las.cross.vpc.accountId': 'xxx' }