You need to enable JavaScript to run this app.
导航
EMR Serverless Spark Java/Python
最近更新时间:2025.01.17 19:37:58首次发布时间:2024.05.24 15:27:43

1 概述

EMR Serverless Spark 是火山引擎 E-MapReduce(EMR)框架下 Serverless 形态的数据产品,提供开箱即用,完全兼容开源的 Spark 引擎能力。
DataLeap 支持对接 EMR Serverless Spark 实例,本地编写 Java 代码,上传到对象存储系统 TOS 中,通过在 DataLeap 可视化配置 Spark Java/Python 作业的方式,进行资源引用。并通过 LAS Foramtion 提供统一元数据服务和数据权限服务,内置高可用 Remote shuffle Service,支持直接读写对象存储系统 TOS,满足您定制化数据查询分析的需求。常应用于企业元数据仓库建设、混合云架构方案搭建等场景。
下面将为您介绍 DataLeap 中 EMR Serverless Spark Java/Python 任务的配置操作。

2 使用前提

  1. 目前 DataLeap 绑定 EMR Serverless Spark 实例处于白名单使用阶段,您可通过提工单的方式,请 DataLeap 支持同学进行白名单开通使用。
  2. 已开通 EMR Serverless Spark 队列资源实例。详见队列管理
  3. 需开通 DataLeap 服务版本中大数据分析、DataOps敏捷研发分布式数据自治的服务,项目方可继续绑定 EMR Serverless Spark 实例。详见版本服务说明
  4. 已在 DataLeap 项目控制台中,绑定相应的 EMR Serverless Spark 服务实例。详见创建项目
  5. 已开通对象存储 TOS 服务,并创建存储桶,且在相应存储桶中上传 JAR 资源文件。操作详见创建存储桶上传文件
  6. 子用户访问 EMR Serverless Spark 队列资源时,需确保拥有 EMRServerlessFullAccessEMRServerlessReadOnlyAccess 权限策略,子用户可请主账号在访问控制界面进行权限策略添加。

3 新建任务

  1. 登录 DataLeap租户控制台
  2. 概览界面,显示加入的项目中,单击数据开发进入对应项目。
  3. 任务开发界面,左侧导航栏中,单击新建任务按钮,进入新建任务页面。
  4. 选择任务类型:
    1. 分类:数据开发
    2. 绑定引擎:EMR Serverless Spark
    3. 关联实例:显示项目绑定时的集群实例信息。

      注意

      在项目控制台管理界面中,如果新增或修改了引擎,那么在数据开发任务新建窗口中,需刷新整个 DataLeap 数据开发界面,才能看到新增或修改后的引擎任务类型。

    4. 选择任务:离线数据 EMR Serverless Spark Java/Python
  5. 填写任务基本信息:
    1. 任务名称:输入任务的名称,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。
    2. 保存至:选择任务存放的目标文件夹目录。
  6. 单击确定按钮,成功创建任务。

Image

4 任务配置说明

新建任务完成后,您可在任务配置界面完成以下参数配置:

4.1 语言设置

语言类型支持选择 Java、Python。

注意

语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。

4.2 引入资源

  • 语言类型选择 Java:
    EMR Serverless Spark Java 作业,将直接使用已上传到 TOS 的资源包,您可直接输入已上传的资源在 TOS 中的路径信息,如:tos://test-tos/emr-spark/tos-jar/spark-examples-2.3.4.jar。
    TOS 中上传文件操作详见文件上传
  • 语言类型选择 Python 时:
    • 资源类型默认选择 Python 类型。
    • 在编辑器中输入 Python 语句,执行引擎只支持 Python3.7。

    注意

    设置系统环境变量时,避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/;

4.3 参数配置

参数

说明

Spark 参数

Main Class

语言类型选择 Java 时,需填写主类信息,如 org.apache.spark.examples.JavaSparkPi。

Conf参数

配置任务中需设置的一些 conf 参数,例如您可通过spark.yarn.appMasterEnvspark.executorEnv 参数,来分别设置 driver、executor 环境变量参数:

spark.yarn.appMasterEnv.PYTHONPATH="$PYTHONPATH:/xxx"
spark.executorEnv.PYTHONPATH="$PYTHONPATH:/xxx"

您可通过以下两种方式来进行配置:

  • 单行编辑模式:在对应输入框中,输入参数的 key-value 值。
  • 脚本编辑模式:支持 JSON、Yaml 的格式,直接用脚本方式进行配置参数。

Image

注意

EMR Serverless Spark Jar 任务访问 TOS 资源时,需进行 TOS 鉴权操作,详见 TOS 权限配置。因此您必须在 Conf 参数中增加有权限访问 TOS 存储桶的账号 AK/SK 信息,格式如下:
serverless.spark.access.key: xxxxxxx
serverless.spark.secret.key: xxxxxxx
您可进入火山引擎,访问控制台的密钥管理界面,复制 Access key ID、 Secret Access Key 信息。如果是子用户,请联系主账号获取密钥。详见 AK 秘钥管理
秘钥 AK/SK 这类敏感信息,支持您以自定义项目参数形式,进行加密配置,如{{AK}}、{{SK}}。具体加密操作详见参数信息设置

更多参数配置可参考:https://spark.apache.org/docs/latest/configuration.html

任务参数

自定义参数

输入任务中已定义的参数,多个参数以空格形式进行分隔,例如 param1 param2 param3,参数最终将以字符串形式传入,支持配置时间参数,如 '${DATE}'、'${HOUR}',自定义参数格式 {{param1}}。
时间参数详见平台时间变量与常量说明,上下文传参操作详见输入输出参数设置

4.4 任务产出数据登记

任务产出数据登记,用于记录任务、数据血缘信息,并不会对代码逻辑造成影响。对于系统无法通过自动解析获取产出信息的其他任务,您可在此手动登记任务产出数据相关信息。
EMR Serverless Spark Java/Python 任务可手动登记其产出信息。如果任务含有 LAS Catalog 库表数据的产出,则强烈建议填写,以便后续维护任务数据血缘关系。
您手动填写的内容即为任务产出,支持填写多个。其他任务依赖时,您可在其设置调度依赖时,通过依赖推荐手动添加的方式,便可依据此处 EMR Serverless Spark Java/Python 任务设置的产出库表名信息来搜索添加依赖。 具体登记内容包括以下数据类型:

  • EMR Serverless Spark:该任务逻辑会将数据写入到 LAS Catalog 表,需填写 LAS Catalog 的数据库名、表名、分区名,分区内容可以使用变量,如 ${date}、${hour} 形式。
  • 其他:该任务逻辑不写数据到 LAS Catalog 表。

5 调度设置

任务配置完成后,在右侧导航栏中,单击调度配置按钮,进入调度配置窗口,您可以在此设置调度属性、依赖、任务输入输出参数等信息。详细参数设置详见:调度设置
其中在调度基本信息 > 计算队列选择时,您可下拉选择项目控制台中已绑定的 Spark 队列类型,来执行 EMR Serverless Spark Java/Python 任务。
Image

注意

EMR Serverless Spark Java/Python 作业选择包年包月中开启“SQL 专用资源”的队列类型执行时,需在作业执行时的 Spark 参数中,添加该参数 tqs.query.engine.type = sparkcli。
且队列设定的资源规格有以下要求:

  • 若队列容量类型为固定容量,设定的队列规格,必须要大于 SQL 专用资源中分配的资源规格,确保其有剩余的通用资源,来运行 Spark Java/Python 任务。
    Image
  • 若队列容量类型为弹性容量,设定的 MIN 队列规格,必须要大于 SQL 专用资源中分配的资源规格,确保其有剩余的通用资源,来运行 Spark Java/Python 任务。
    Image

更多计算队列操作详见队列管理

6 调试运行

任务配置完成后,您可单击操作栏中的保存调试按钮,进行任务调试。

注意

  • 调试操作,直接使用线上数据进行调试,需谨慎操作。
  • 本任务类型支持调试执行成功或失败后发送消息通知,您可根据业务情况,前往项目控制台 > 配置信息 > 消息通知设置中,选择是否开启任务调试运行成功失败通知。
    • 默认通知方式为邮箱,您需在“账号管理”中,提前绑定相应的安全邮箱信息;
    • 您也可根据业务需要,自行配置飞书应用机器人,通过飞书的方式发送消息通知,飞书消息通知前置操作详见1.1 飞书应用机器人创建

7 提交任务

查询结果确认无误后,单击上方操作栏中的保存提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:数据开发概述---离线任务提交

注意

如果租户主账号或项目管理员在项目控制台>流水线管理中启用了流水线流程校验,则您需要确保提交的任务符合流水线扩展程序的校验规则,才能成功提交。详见4 流水线管理

后续任务运维操作详见:离线任务运维

8 使用示例

8.1 估算圆周率示例

8.1.1 任务配置

以下示例将为您演示通过 EMR Serverless Spark Python 任务中 Python 语言方式,执行一段 Python 脚本来估算圆周率。

  1. 新建 EMR Serverless Spark Java/Python 任务,详见上方3 新建任务

  2. 进入任务配置界面,语言类型选择 Python,引入资源类型选择 Python
    Image

  3. 在代码编辑区域,编辑以下相关 Python 语句:

    import sys
    from random import random
    from operator import add
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        """
            Usage: pi [partitions]
        """
        spark = SparkSession\
            .builder\
            .appName("PythonPi")\
            .getOrCreate()
    
        partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
        n = 100000 * partitions
    
        def f(_: int) -> float:
            x = random() * 2 - 1
            y = random() * 2 - 1
            return 1 if x ** 2 + y ** 2 <= 1 else 0
    
        count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
        print("Pi is roughly %f" % (4.0 * count / n))
    
        spark.stop()
    
  4. Python 脚本编辑完成后,您可按需配置 conf 参数或自定义参数,如设定分区数参数等。

8.1.2 调试运行

任务配置完成后,您可单击操作栏中的保存调试按钮,进行任务调试。

8.1.3 查看日志输出结果

  1. 待任务执行成功后,您可在调试记录-概览界面,获取以下 TrackingURL 日志链接信息。
    Image
  2. 在 Spark 日志界面,单击 Executors 页签。
    Image
  3. Logs 列中,单击 stdout 按钮,进入查看任务执行结果。
    Image
    Image

8.2 查询 LAS Catalog 元数据示例

8.1.1 任务配置

该示例将为您演示通过 EMR Serverless Spark Python 任务中 Python 语言方式,执行一段 Pyspark 脚本来访问 LAS Catalog 中元数据。

  1. 新建 EMR Serverless Spark Java/Python 任务,详见上方3 新建任务

  2. 进入任务配置界面,语言类型选择 Python,引入资源类型选择 Python

  3. 在代码编辑区域,编辑以下相关 Python 语句:

    from pyspark.sql import SparkSession
    
    # 创建SparkSession对象
    spark = SparkSession.builder \
        .enableHiveSupport() \
        .getOrCreate()
    
    df1 = spark.sql("insert into database.table_name partition (date='${date}') values(3,'kaijie',989)")
    df2 = spark.sql("select * from database.table_name where date='${date}';")
    df2.show()
    
    spark.stop()
    

    Image

  4. 通过该方式访问 TOS 中的 LAS Catalog 元数据资源时,在 Python 脚本编辑完成后,您还需在下方添加以下 Conf 参数,来通过 TOS 的鉴权:

    • serverless.spark.access.key: xxxxxxx
    • serverless.spark.secret.key: xxxxxxx
      Image
      conf 参数说明详见4.3 参数配置

8.1.2 调试运行

任务配置完成后,您可单击操作栏中的保存调试按钮,进行任务调试。

8.1.3 查看日志输出结果

  1. 待任务执行成功后,您可在调试记录-日志界面,获取以下 Spark 日志链接信息。
    Image
  2. 在 Spark 日志界面,单击 Executors 页签。
    Image
  3. Logs 列中,单击 stdout 按钮,进入查看任务执行结果。
    Image
    Image