You need to enable JavaScript to run this app.
导航
PySpark作业开发指南
最近更新时间:2024.05.16 20:59:43首次发布时间:2024.05.16 20:59:43

1 简介

为满足用户个性化数据查询分析的需求,EMR Serverless Spark支持用户编写Python,并提交PySpark作业。
PySpark作业使用队列中的通用资源,请检查队列中是否存在通用资源,详见:Spark Jar作业开发指南

2 提交作业

2.1命令方式

EMR Serverless Spark 支持通过Set参数的方式,提交一个pySpark作业,极简demo如下:
进入创建作业界面:

将下述命令输入SQL编辑器中,并提交:

set serverless.spark.access.key = AKxxxx;
set serverless.spark.secret.key = WV;
set tqs.query.engine.type = sparkjar;
set spark.jar.resource = tos://bucketname/path/pi.py;


pi.py中代码如下:

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

您可以在Spark官方文档中查看更多的PySpark示例:PySpark Overview

该demo可以将您tos路径:tos://bucketname/path/pi.py 下的py代码文件作为您PySpark中的可执行Python代码,并提交至EMR Serverless Spark中。其中参数含义如下:

参数名称参数值
serverless.spark.access.key您访问LAS Formation库表以及TOS所需的Access Key,您可从IAM 访问密钥中拿到自己账户的AK
serverless.spark.secret.key您访问LAS Formation库表以及TOS所需的Secret Key
tqs.query.engine.type作业类型,这里固定为**sparkjar**
spark.jar.resource您的Python代码所存放的tos路径

提交后,您可以在SparkJar作业列表中,找到您的作业,并查看对应的日志和WebUI进行作业的运维:

2.2 Python SDK 方式

除了命令方式提交,EMR Serverless Spark还支持使用SDK方式提交
如何快速入门Python SDK,您可详细参考:Python Query SDK
这里提供一个极简的demo:

def execute_spark_task():
    from emr.client import ServerlessQueryClient
    from emr.auth  import StaticCredentials
    from emr.resource import JarResourceInfo
    from emr.task import SparkJarTask

    # 访问您名下TOS桶以及LAS Formation所需的AK
    ak = 'your ak'
    # 访问您名下TOS桶以及LAS Formation所需的SK
    sk = 'your sk'
    # EMR Serverless Spark所在的Region
    region = 'cn-beijing'
    # 火山官方endpoint
    endpoint = 'open.volcengineapi.com'
    # 将您TOS中:tos://emr-dev/test/pi.py 文件作为可执行的Py代码,提交至PySpark
    client = ServerlessQueryClient(credentials=StaticCredentials(ak, sk), region=region,
                       endpoint=endpoint)
    job = client.execute(task=SparkJarTask(name="Py Spark demo",
                   jar=JarResourceInfo.of('tos://emr-dev/test/pi.py'),
                   main_args=['arg_xxx', 'arg_yyy']
                   ), is_sync=False)

    def when_to_exit() -> bool:
        return job.get_tracking_url() is not None

    # 等待任务提交
    job.wait_for(when_to_exit=when_to_exit, timeout=180)
    # 获取任务TrackingURL
    print('Tracking Url: %s' % job.get_tracking_url())

    # 等待任务执行结束
    job.wait_for_finished()
    print('The task executed successfully.')

if __name__ == "__main__":
    execute_spark_task()

3 进阶操作

3.1 添加自定义Python文件/Python依赖

3.1.1适用场景:

主执行Python文件可能还依赖其他的Python文件,或通用的Python lib

3.1.2使用方法:

EMR ServerlessSpark支持通过命令的方式添加依赖的Python文件/Python依赖,在您的命令中,可以添加如下命令:

set las.spark.jar.depend.pyFiles=[{"fileName":"tos://xx.zip"}, {"fileName":"tos://xx.py"}]

即可将tos路径下:tos://xx.zip 和 tos://xx.py 两个文件加载到PySpark任务中,并可在Python中通过import引用
参数详细解释如下:

参数名称参数解释
las.spark.jar.depend.pyFilespy文件/lib 所在的tos路径,以json array的方式组织

依赖文件类型可以为PyFileZip两种。

  • 依赖PyFile类型资源

资源内容为一个Python脚本,在入口Python文件中引用该脚本时,作为module引入,module名称为文件名。
例:
依赖为start.py时,在主文件引用方式为

from start import xxx
  • 依赖Zip类型资源

资源内容为一个zip包,在入口Python文件引用zip包中的内容时,可以直接将zip包中内容作为package来使用。
例:
zip包目录结构为:

core/start.py
core/end.py

引用方式为

from core.start import xxx
from core.end import xxx

3.2添加自定义文件

3.2.1适用场景

主执行Python文件需要读取TOS上的文件,但是不想引入TOS SDK,想直接在代码执行Classpath读到文件

3.2.2使用方式

EMR Serverless Spark支持通过命令添加自定义文件,您可以通过命令将tos上的文件添加到PySpark的Classpath中,并通过Python代码进行读取,在您的命令中,可以添加如下命令:

set las.spark.jar.depend.files = [{"fileName":"tos://xx.txt"}]

即可将tos路径下:tos://xx.txt 文件加载到PySpark任务中
参数详解:

参数名称参数解释
las.spark.jar.depend.files文件所在的tos路径,以json array的方式组织

添加后,您可以使用PySpark自带的API进行文件的读写

方法参数返回值
SparkFiles.get('xxx')String类型参数,xx为访问名称返回文件的绝对路径
SparkFiles.getRootDirectory()返回文件所在父目录的绝对路径

3.3添加自定义Jar

3.3.1适用场景

通过PySpark Dataframe 访问其他数据源,例如访问火山ES、VeDB时,需要添加相应的connecto jar

下述demo表示通过Spark Dataframe读取mysql数据,此时就需要添加mysql jdbc 相关的Driver jar包

df = spark.read.format("jdbc") \
        .option("url", f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}") \
        .option("dbtable", "xx") \
        .option("user", MYSQL_USERNAME) \
        .option("password", MYSQL_PASSWORD) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("partitionColumn", PARTITION_COL) \
        .option("lowerBound", 1) \
        .option("upperBound", PARTITION_UPPER_BOUND) \
        .option("numPartitions", PARTITION_NUM) \
        .load()

3.3.2使用方式

EMR Serverless Spark支持添加自定义的Jar包,您可以通过命令中添加如下参数添加自定义的Jar至任务Classpath中:

set las.spark.jar.depend.jars = [{"fileName":"tos://emr-dev/elasticsearch-spark-35_2.12-8.13.2.jar"}]

参数详解:

参数名称参数解释
las.spark.jar.depend.jars依赖Jar文件所在的tos路径,以json array的方式组织

3.4使用自定义的Python环境

3.4.1适用场景

有自定义Python版本需求(EMR Serverless Spark默认使用Python3.7版本),Python环境依赖复杂
EMR Serverless Spark 支持使用自定义的Python环境,您可以自定义您的PySpark运行时环境,包括但不限于自定义Python版本、自定义Python依赖等。

3.4.2使用方式

注意

python编译环境要求:X86_64

可以使用virtualenv或者conda构造自定义Python版本以及依赖环境。

注意

  • 复杂依赖建议使用conda,python环境更全
  • 有些依赖包依赖底层非pip安装的一些实现,这些包使用virtualenv是不能打包到独立的python环境中的。
3.4.2.1构建独立python环境
  • 使用conda构建独立python环境
# 构造python版本为3.7.9的python环境
conda create -n python379 python=3.7.9
# 进入到该环境下
conda activate python379
# 安装pandas依赖
echo 'pandas' > requirements.txt
pip install -r requirements.txt
# 打包独立环境,产出zip包python379.zip
cd ${conda_home}/envs/python379 && zip -r python379.zip *
# 退出
conda deactivate
  • 使用virtualenv构建独立python环境
# 构造python版本为本地python3对应的python版本
virtualenv --python=$(which python3) --clear python379
# 进入到该环境下
source python379/bin/activate
# 安装koalas依赖
echo 'koalas' > requirements.txt
pip install -r requirements.txt
# 打包独立环境,产出zip包python379.zip
cd python379 && zip -r python379.zip *
# 退出
deactivate
3.4.2.2使用自定义python运行pyspark
  • 将python379.zip 上传至TOS中,这里路径假设为:tos://emr-dev/python379.zip

  • 指定pyFiles依赖,在pyFiles里面添加该资源依赖。

set las.spark.jar.depend.pyFiles=[{"fileName":"tos://emr-dev/python379.zip"}]
  • 指定archives依赖,在archives中增加该资源依赖。
set las.spark.jar.depend.archives=[{"fileName":"tos://emr-dev/python379.zip"}]
  • 设置python参数
set spark.unpackUseCommand.enabled = true;
set spark.pyspark.driver.python = python379/bin/python3;
set spark.pyspark.python = python379.zip/bin/python3;
参数名称参数值参数解释
spark.unpackUseCommand.enabledtrue使用command方式解压zip包,防止数据权限丢失
spark.pyspark.driver.pythonpython379/bin/python3driver使用Python,前缀路径为文件名
spark.pyspark.pythonpython379.zip/bin/python3executor使用Python,前缀路径为文件名+.zip
  • 在控制台提交任务,即可完成提交自定义python环境任务