EMR Serverless Spark 是火山引擎 E-MapReduce(EMR)框架下 Serverless 形态的数据产品,提供开箱即用,完全兼容开源的 Spark 引擎能力。
DataLeap 支持对接 EMR Serverless Spark 实例,本地编写 Java 代码,上传到对象存储系统 TOS 中,通过在 DataLeap 可视化配置 Spark Java/Python 作业的方式,进行资源引用。并通过 LAS Foramtion 提供统一元数据服务和数据权限服务,内置高可用 Remote shuffle Service,支持直接读写对象存储系统 TOS,满足您定制化数据查询分析的需求。常应用于企业元数据仓库建设、混合云架构方案搭建等场景。
下面将为您介绍 DataLeap 中 EMR Serverless Spark Java/Python 任务的配置操作。
注意
在项目控制台管理界面中,如果新增或修改了引擎,那么在数据开发任务新建窗口中,需刷新整个 DataLeap 数据开发界面,才能看到新增或修改后的引擎任务类型。
新建任务完成后,您可在任务配置界面完成以下参数配置:
语言类型支持选择 Java、Python。
注意
语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。
注意
设置系统环境变量时,避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/
;
参数 | 说明 |
---|---|
Spark 参数 | |
Main Class | 语言类型选择 Java 时,需填写主类信息,如 org.apache.spark.examples.JavaSparkPi。 |
Conf参数 | 配置任务中需设置的一些 conf 参数,例如您可通过
您可通过以下两种方式来进行配置:
注意 EMR Serverless Spark Jar 任务访问 TOS 资源时,需进行 TOS 鉴权操作,详见 TOS 权限配置。因此您必须在 Conf 参数中增加有权限访问 TOS 存储桶的账号 AK/SK 信息,格式如下: 更多参数配置可参考:https://spark.apache.org/docs/latest/configuration.html |
任务参数 | |
自定义参数 | 输入任务中已定义的参数,多个参数以空格形式进行分隔,例如 param1 param2 param3,参数最终将以字符串形式传入,支持配置时间参数,如 '${DATE}'、'${HOUR}',自定义参数格式 {{param1}}。 |
任务产出数据登记,用于记录任务、数据血缘信息,并不会对代码逻辑造成影响。对于系统无法通过自动解析获取产出信息的其他任务,您可在此手动登记任务产出数据相关信息。
EMR Serverless Spark Java/Python 任务可手动登记其产出信息。如果任务含有 LAS Catalog 库表数据的产出,则强烈建议填写,以便后续维护任务数据血缘关系。
您手动填写的内容即为任务产出,支持填写多个。其他任务依赖时,您可在其设置调度依赖时,通过依赖推荐或手动添加的方式,便可依据此处 EMR Serverless Spark Java/Python 任务设置的产出库表名信息来搜索添加依赖。 具体登记内容包括以下数据类型:
任务配置完成后,在右侧导航栏中,单击调度配置按钮,进入调度配置窗口,您可以在此设置调度属性、依赖、任务输入输出参数等信息。详细参数设置详见:调度设置。
其中在调度基本信息 > 计算队列选择时,您可下拉选择项目控制台中已绑定的 Spark 队列类型,来执行 EMR Serverless Spark Java/Python 任务。
注意
EMR Serverless Spark Java/Python 作业选择包年包月中开启“SQL 专用资源”的队列类型执行时,需在作业执行时的 Spark 参数中,添加该参数 tqs.query.engine.type = sparkcli。
且队列设定的资源规格有以下要求:
更多计算队列操作详见队列管理。
任务配置完成后,您可单击操作栏中的保存和调试按钮,进行任务调试。
注意
查询结果确认无误后,单击上方操作栏中的保存和提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:数据开发概述---离线任务提交。
注意
如果租户主账号或项目管理员在项目控制台>流水线管理中启用了流水线流程校验,则您需要确保提交的任务符合流水线扩展程序的校验规则,才能成功提交。详见4 流水线管理。
后续任务运维操作详见:离线任务运维。
以下示例将为您演示通过 EMR Serverless Spark Python 任务中 Python 语言方式,执行一段 Python 脚本来估算圆周率。
新建 EMR Serverless Spark Java/Python 任务,详见上方3 新建任务。
进入任务配置界面,语言类型选择 Python,引入资源类型选择 Python。
在代码编辑区域,编辑以下相关 Python 语句:
import sys from random import random from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": """ Usage: pi [partitions] """ spark = SparkSession\ .builder\ .appName("PythonPi")\ .getOrCreate() partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions def f(_: int) -> float: x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop()
Python 脚本编辑完成后,您可按需配置 conf 参数或自定义参数,如设定分区数参数等。
任务配置完成后,您可单击操作栏中的保存和调试按钮,进行任务调试。
该示例将为您演示通过 EMR Serverless Spark Python 任务中 Python 语言方式,执行一段 Pyspark 脚本来访问 LAS Catalog 中元数据。
新建 EMR Serverless Spark Java/Python 任务,详见上方3 新建任务。
进入任务配置界面,语言类型选择 Python,引入资源类型选择 Python。
在代码编辑区域,编辑以下相关 Python 语句:
from pyspark.sql import SparkSession # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() df1 = spark.sql("insert into database.table_name partition (date='${date}') values(3,'kaijie',989)") df2 = spark.sql("select * from database.table_name where date='${date}';") df2.show() spark.stop()
通过该方式访问 TOS 中的 LAS Catalog 元数据资源时,在 Python 脚本编辑完成后,您还需在下方添加以下 Conf 参数,来通过 TOS 的鉴权:
任务配置完成后,您可单击操作栏中的保存和调试按钮,进行任务调试。