You need to enable JavaScript to run this app.
导航
Serverless PySpark 开发指南
最近更新时间:2024.11.29 14:54:11首次发布时间:2024.10.25 16:36:08

SDK安装

在使用Python SDK前,需要您本地通过wheel文件,安装SDK所需的依赖包。
安装包:

python_serverless-1.0.1-py3-none-any .whl
未知大小

下载完上述文件后,执行下述命令进行安装,即可开始使用Python SDK提交作业。

pip3 install python_serverless-1.0.1-py3-none-any.whl

SDK参数说明

Pyspark 任务为用户提供了通过编写 Python Spark 应用进行定制化数据分析需求的支持。详见 PySpark作业开发指南
PySparkTask 是 SDK 提供 PySpark 任务执行的接口:

参数

类型

是否必须

描述

script

str

Y

任务执行时使用的 SparkJar 资源,需传入tos路径,例如:['tos://bucket/path/to/pyfile']

args

list

N

spark application 的 main function 参数,默认为 empty list

name

str

N

任务名,如果不指定会以 SparkJarTask_${current_time} 的方式生成

conf

dict

N

用于指定任务参数,默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

depend_jars

[ ]str

N

依赖的jar文件,对应spark-submit的--jars选项,例如:['tos://bucket/path/to/jar']

files

[ ]str

N

依赖的文件,对应spark-submit的--files选项,例如:['tos://bucket/path/to/file']

archives

[ ]str

N

依赖的archive文件,对应spark-submit的--archieves选项, ,例如:['tos://bucket/path/to/archive']

pyfiles

[ ]str

N

依赖的pyfile文件,对应spark-submit的--pyfiles选项,,例如:['tos://bucket/path/to/pyfile']

创建pyspark任务

公共参数

  • 支持区域如下:
    • cn-beijing
    • cn-shanghai
    • cn-guangzhou
    • ap-southeast-1
  • spark参数。

属性

属性名称

描述

默认值

Driver 相关

spark.driver.memory

Driver 堆内内存大小

12g

spark.driver.memoryOverhead

Driver 堆外内存大小

4g

spark.driver.cores

Driver 程序的 CPU 核心数

4

spark.driver.maxResultSize

Driver 返回结果的最大大小

3g

Executor 相关

spark.executor.cores

每个 Executor 的 CPU 核心数

4

spark.executor.memory

每个 Executor 的内存大小

12g

spark.executor.memoryOverhead

每个 Executor 的堆外内存大小

4g

spark.executor.instances

Executor 实例数(默认开启了dynamic,无需设置此参数)

1

dynamic相关

spark.dynamicAllocation.enabled

是否开启dynamic

true

spark.dynamicAllocation.minExecutors

最少Executor个数

1

spark.dynamicAllocation.maxExecutors

最大Executor个数

30

并行度和任务调度

spark.sql.files.maxPartitionBytes

Map 单个task的文件大小

268435456

spark.sql.shuffle.partitions

Shuffle 操作时生成的分区数

200

执行SQL任务

  • spark-demo-sql.py
from pyspark.sql import SparkSession

# 创建SparkSession对象
spark = SparkSession.builder \
    .enableHiveSupport() \
    .getOrCreate()

spark.sql("select 1").show()

spark.stop()
  • spark-submit.py
from serverless.task import PySparkTask
from serverless.auth import StaticCredentials
from serverless.client import ServerlessClient
from serverless.enums import JobStatus
from serverless.exceptions import QuerySdkError
import time

ak = '<此处填写用户实际的 AK>'
sk = '<此处填写用户实际的 SK>'
region = 'cn-beijing'
service = 'emr_serverless'
endpoint = 'open.volcengineapi.com'


def submit():
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    pyscript_resource = 'tos://emr-qa/lei/demo/spark/pyspark_demo_sql.py'
    # 自定义参数
    main_args = []
  

      task = PySparkTask(name='lei pyspark task test',
                       script=pyscript_resource,
                       args=main_args,
                       conf={'serverless.spark.access.key': ak,
                             'serverless.spark.secret.key': sk,
                             # 使用自定义资源,默认driver和executor是4c/16g
                             # 'spark.driver.cores': '4',
                             # 'spark.driver.maxResultSize': '100g',
                             # 'spark.driver.memory': '12g',
                             # 'spark.driver.memoryOverhead': '4g',
                             # 'spark.executor.cores': '4',
                             # 'spark.executor.memory': '12g',
                             # 'spark.executor.memoryOverhead': '4g',

                             # 使用自定义镜像
                             'las.cluster.type': 'vke',
                             'spark.kubernetes.container.image': 'emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-spark/spark:online-SQL-1.0.0.103-20240627105213'
                             }
                       #queue=xxx;不填写默认是公共队列
                       )

    print('start submit task')
    job = client.execute(task=task, is_sync=False)
    while not JobStatus.is_finished(job.status):
        job = client.get_job(job.id)
        print('Id: %s, Status: %s' % (job.id, job.status))
        try:
            print('Tracking ui: %s' % job.get_tracking_url())
        except QuerySdkError:
            pass
        time.sleep(3)

    print('The task executed successfully!!!')
    print('Tracking ui: %s' % job.get_tracking_url())
    print('Result ui: %s' % job.get_result_url())


def get_conf():
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    job = client.get_job(277548274)

    client.cancel_job(job)
    print('start_time: %s' % job.__getattribute__("start_time"))
    print('end_time: %s' % job.__getattribute__("end_time"))
    print('status: %s' % job.__getattribute__("status"))
    print('queue_name: %s' % job.__getattribute__("queue_name"))
    print('conf: %s' % job.__getattribute__("conf"))
    print('tracking_url: %s' % job.get_tracking_url())


def cancel_job():
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    client.cancel_job(client.get_job(277548274))


if __name__ == "__main__":
    submit()

访问TOS文件

  • spark-demo-tos.py
import argparse
from pyspark.sql import SparkSession
from pyspark import SparkFiles

# 创建SparkSession对象
spark = SparkSession.builder \
    .enableHiveSupport() \
    .getOrCreate()

parser = argparse.ArgumentParser(description='这是一个示例程序')
parser.add_argument('--name', help='name')
parser.add_argument('--inputFile', help='输入文件')
parser.add_argument('--archiveName', help='输出文件')
parser.add_argument('--archiveFile', help='输出文件路径')
args = parser.parse_args()
inputFile = args.inputFile
name = args.name
archiveName = args.archiveName
archiveFile = args.archiveFile
# 打印参数的值
print('name输入文件路径:', name)
print('输入文件路径:', inputFile)
print('输出文件路径:', archiveName)
print('显示详细信息:', archiveFile)

print('开始输出文件内容:', inputFile)
file_df = spark.sparkContext.textFile(inputFile)
file_df_data = file_df.collect()

SparkFiles.get(inputFile)
print("inputFile data:")
for line in file_df_data:
    print(line)


# 创建示例数据
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
# 将DataFrame保存为CSV文件
output_path = "tos://emr-qa/lei/lei_db/people_table.csv"
df.write.csv(output_path)


spark.stop()
  • spark-submit.py
from serverless.task import PySparkTask
from serverless.auth import StaticCredentials
from serverless.client import ServerlessClient
from serverless.enums import JobStatus
from serverless.exceptions import QuerySdkError
import time

ak = '<此处填写用户实际的 AK>'
sk = '<此处填写用户实际的 SK>'
region = 'cn-beijing'
service = 'emr_serverless'
endpoint = 'open.volcengineapi.com'


def submit():
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    pyscript_resource = 'tos://emr-qa/lei/demo/spark/pyspark_demo_sql.py'
    # 自定义参数
    main_args = ['--inputFile', 'rel.txt', '--archiveName', 'archives_data.zip', '--archiveFile', 'rel.txt']
    # 依赖的文件,对应spark-submit的--files选项
    files = ['tos://emr-qa/lei/demo/spark/rel.txt']
    # 依赖的archive文件,对应spark-submit的--archieves选项,
    archives = ['tos://emr-qa/lei/demo/spark/archives_data.zip']
    # 依赖的pyfile文件,对应spark-submit的--pyfiles选项
    pyfiles = ['tos://emr-qa/lei/demo/spark/basic_statistics.zip']

    task = PySparkTask(name='lei pyspark task test',
                       script=pyscript_resource,
                       args=main_args,
                       conf={'serverless.spark.access.key': ak,
                             'serverless.spark.secret.key': sk,

                             # 使用自定义镜像
                             'las.cluster.type': 'vke',
                             'spark.kubernetes.container.image': 'emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-spark/spark:online-SQL-1.0.0.103-20240627105213'
                             },
                       files=files,
                       archives=archives,
                       pyfiles=pyfiles
                       #queue=
                       )

    print('start submit task')
    job = client.execute(task=task, is_sync=False)
    while not JobStatus.is_finished(job.status):
        job = client.get_job(job.id)
        print('Id: %s, Status: %s' % (job.id, job.status))
        try:
            print('Tracking ui: %s' % job.get_tracking_url())
        except QuerySdkError:
            pass
        time.sleep(3)

    print('The task executed successfully!!!')
    print('Tracking ui: %s' % job.get_tracking_url())
    print('Result ui: %s' % job.get_result_url())


def get_conf(job_id: str):
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    job = client.get_job(job_id)

    client.cancel_job(job)
    print('start_time: %s' % job.__getattribute__("start_time"))
    print('end_time: %s' % job.__getattribute__("end_time"))
    print('status: %s' % job.__getattribute__("status"))
    print('queue_name: %s' % job.__getattribute__("queue_name"))
    print('conf: %s' % job.__getattribute__("conf"))
    print('tracking_url: %s' % job.get_tracking_url())


def cancel_job(job_id: str):
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    client.cancel_job(client.get_job(job_id))

if __name__ == "__main__":
    submit()

依赖自定义镜像

  • 配置PySparkTaskconf参数。
{
  'serverless.spark.access.key': 'xx',
  'serverless.spark.secret.key': 'xx',
  'las.cluster.type': 'vke',
  'spark.kubernetes.container.image': 'emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-spark/spark:online-SQL-1.0.0.103-20240708105213'
}

依赖自定义jar

  • 配置conf参数。
{
  'serverless.spark.access.key': 'xx',
  'serverless.spark.secret.key': 'xx',
  'las.spark.jar.depend.jars': '[{\"fileName\": \"tos://lei-las-formation/pyspark/spark-jar-demo-1.0-SNAPSHOT.jar\"}]'
}

依赖自定义module

  • spark-demo-module.py。
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

from basic_statistics.file_info import getFile1
from basic_statistics.file_info import get_all_files
from tools import func
from module1 import my_module1
from module2 import my_module2

# 创建SparkSession对象
spark = SparkSession.builder \
    .enableHiveSupport() \
    .getOrCreate()

get_all_files("./")
getFile1("/")

taxCut = udf(lambda salary: func.tax(salary), FloatType())

staff_data = [
    {"name": "Alice", "salary": 70000},
    {"name": "Bob", "salary": 60000},
    {"name": "Charlie", "salary": 50000},
    {"name": "David", "salary": 80000},
    {"name": "Eve", "salary": 75000},
    {"name": "Frank", "salary": 40000},
    {"name": "Grace", "salary": 95000},
    {"name": "Hannah", "salary": 30000},
]
df = spark.createDataFrame(staff_data)

taxCut = udf(lambda salary: func.tax(salary), FloatType())
df.select("name", taxCut("salary").alias("final salary")).show()


# 创建 DataFrame
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["value"])

# 使用模块中的函数
result_multiply = df.rdd.map(lambda x: my_module1.multiply(x[0], 5)).collect()
result_add = df.rdd.map(lambda x: my_module2.add(x[0], 10)).collect()

time.sleep(360)
spark.stop()
  • 提交spark任务client。
from serverless.task import PySparkTask
from serverless.auth import StaticCredentials
from serverless.client import ServerlessClient
from serverless.enums import JobStatus
from serverless.exceptions import QuerySdkError
import time

ak = '<此处填写用户实际的 AK>'
sk = '<此处填写用户实际的 SK>'
region = 'cn-beijing'
service = 'emr_serverless'
endpoint = 'open.volcengineapi.com'


def submit():
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    pyscript_resource = 'tos://emr-qa/lei/demo/spark/pyspark_demo_module.py'
    # 自定义参数
    main_args = []
  
    pyfiles = ['tos://emr-qa/lei/demo/spark/basic_statistics.zip', 'tos://emr-qa/lei/demo/spark/tools.zip',
           'tos://emr-qa/lei/demo/spark/module1.zip',
           'tos://emr-qa/lei/demo/spark/module2.zip']
    task = PySparkTask(name='lei pyspark task test',
                       script=pyscript_resource,
                       args=main_args,
                       conf={'serverless.spark.access.key': ak,
                             'serverless.spark.secret.key': sk,
                             'spark.executorEnv.PYTHONPATH': '/opt/spark/work-dir/module1.zip:/opt/spark/work-dir/module2.zip:/opt/spark/work-dir/basic_statistics.zip:/opt/spark/work-dir/tools.zip',
                             },
                       pyfiles=pyfiles
                       )

    print('start submit task')
    job = client.execute(task=task, is_sync=False)
    while not JobStatus.is_finished(job.status):
        job = client.get_job(job.id)
        print('Id: %s, Status: %s' % (job.id, job.status))
        try:
            print('Tracking ui: %s' % job.get_tracking_url())
        except QuerySdkError:
            pass
        time.sleep(3)

    print('The task executed successfully!!!')
    print('Tracking ui: %s' % job.get_tracking_url())
    print('Result ui: %s' % job.get_result_url())


def get_conf(job_id: str):
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    job = client.get_job(job_id)

    client.cancel_job(job)
    print('start_time: %s' % job.__getattribute__("start_time"))
    print('end_time: %s' % job.__getattribute__("end_time"))
    print('status: %s' % job.__getattribute__("status"))
    print('queue_name: %s' % job.__getattribute__("queue_name"))
    print('conf: %s' % job.__getattribute__("conf"))
    print('tracking_url: %s' % job.get_tracking_url())


def cancel_job(job_id: str):
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)
    client.cancel_job(client.get_job(job_id))


if __name__ == "__main__":
    submit()

说明

用户需将命令中的TOS文件路径替换成实际TOS文件路径。

  • 依赖包如下。
basic_statistics.zip
未知大小
module1.zip
未知大小
module2.zip
未知大小
tools.zip
未知大小

依赖自定义conda环境

  • PySparkTaskconf参数。
{
 'serverless.spark.access.key': '{access key}',
 'serverless.spark.secret.key': '{secret key}',
 'spark.unpackUseCommand.enabled': 'true',
 'las.spark.jar.depend.archives': '[{\"fileName\": \"tos://emr-qa/lei/demo/spark/python38_new.zip\"}]',
 'spark.pyspark.python': 'python38_new.zip/python38_new/bin/python3',
 'spark.pyspark.driver.python': 'python38_new.zip/python38_new/bin/python3'
}

说明

用户需将命令中的TOS文件路径替换成实际TOS文件路径。

  • conda安装。
# 安装
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh
# 环境变量
vi .bashrc
export PATH=" /root/miniconda3/bin:$PATH" 
source .bashrc
  • conda构建python3.8环境。
# 构建python3.8环境
conda create --name python38_new python=3.8
source activate python38_new
pip3 install pandas
pip3 install scipy
pip3 install scikit-learn

# 打成压缩包
which python3
cd /root/miniconda/python38_new/env
zip -r python38_new.zip python38_new/

其他用法

Serverless Spark访问HMS

  • 伪代码(pythonSDK)
task = PySparkTask(name='pyspark hms demo',
                   script=pyscript_resource,
                   args=main_args,
                   conf={'serverless.spark.access.key': ak,
                         'serverless.spark.secret.key': sk,
                         'las.cross.vpc.access.enabled':'true',
                         # 火山-租户ID
                         'las.cross.vpc.accountId':'xxx',
                         # 火山-HMS-私有网络ID
                         'las.cross.vpc.vpc.id':'xxx',
                         # 火山-HMS-私有网络-子网ID
                         'las.cross.vpc.subnet.id':'xxx',
                         # 火山-HMS-私有网络-安全组ID
                         'las.cross.vpc.security.group.id':'xxx'}
                   )

Serverless Spark访问火山ES

  • 主要步骤:
    1. Serverless Spark 与火山ES 进行VPC网络打通。
    2. es的依赖jar至您账户下的tos。
    3. 提交spark任务时depends jar里带上es 依赖的路径。
  • 伪代码(pythonSDK)
task = PySparkTask(name='pyspark es demo',
                   script=pyscript_resource,
                   args=main_args,
                   depend_jars=['tos://emr-qa/lei/demo/spark/elasticsearch-spark-30_2.12-8.13.2.jar'],
                   conf={'serverless.spark.access.key': ak,
                         'serverless.spark.secret.key': sk,
                         'las.cluster.type':'vke',
                         'las.cross.vpc.access.enabled':'true',
                         # 火山-租户ID
                         'las.cross.vpc.accountId':'xxx',
                         # 火山-ES-私有网络ID
                         'las.cross.vpc.vpc.id':'xxx',
                         # 火山-ES-私有网络-子网ID
                         'las.cross.vpc.subnet.id':'xxx',
                         # 火山-ES-私有网络-安全组ID
                         'las.cross.vpc.security.group.id':'xxx'}
                   )

说明

用户需将命令中的TOS文件路径替换成实际TOS文件路径。

跨VPC访问

  • 在客户的VPC中创建辅助网卡,并且绑定指定的安全组。
  • 启动的driver和executor绑定上一步的辅助网卡,从而实现跨VPC访问。
  • 配置conf参数。
{
  'las.cluster.type': 'vke',
  'las.cross.vpc.access.enabled': 'true',
  # 火山-用户-VPCId
  'las.cross.vpc.vpc.id': 'vpc-xxx',
  # 火山-用户-子网ID
  'las.cross.vpc.subnet.id': 'subnet-xxx',
  #火山-用户VPC安全组ID
  'las.cross.vpc.security.group.id': 'sg-xxx',
  #火山-用户的租户ID
  'las.cross.vpc.accountId': 'xxx'
}