本文通过演示在火山引擎 E-MapReduce(EMR)的 DolphinScheduler 中运行 Spark、Flink 任务的示例,帮助您更好地理解 DolphinScheduler 的使用。
在 DolphinScheduler 中,资源中心通常用于上传文件、UDF 函数和任务组管理。 EMR DolphinScheduler 中的资源中心基于同集群内的 Hadoop 集群,默认目录是:/dolphinscheduler
,由配置项 resource_upload_path
定义。关于如何修改服务配置参数,请参阅管理服务配置参数。
本示例中,我们会运行一个经典的 WordCount 程序,需要先将所需的 spark jar、flink jar 与 word 文本文件(见下文)上传到资源中心,然后在后续定义具体工作流时进行引用。
上传操作如下:
在运行 Spark SQL、Hive SQL 类型任务时,DolphinScheduler 要求在数据源中心中预先配置好数据源连接信息。这里以 EMR 3.x 版本的 Hadoop 类型集群中配置一个 Spark 数据源为例。
HA集群配置示例如下:
单击界面上方导航栏中的数据源中心按钮,进入数据源中心界面。
单击创建数据源,配置以下数据源信息:
配置项 | 示例 | 说明 |
---|---|---|
数据源 | spark | 下拉选择数据源类型,此处选择 spark 数据源类型。 |
数据源名称 | spark_source | 输入数据源名称信息。 |
描述 | HA 集群配置 | 输入该数据源的描述信息,方便后续管理。 |
IP 主机名 | emr-master-1-1,emr-master-2,emr-master-3 | 输入 spark 数据源的 master 名称。 |
端口 | 2181 | 填写对应的端口号信息。 |
用户名 | hive | Spark 数据源下对应的用户名信息 |
密码 | 对应的密码信息。 | |
数据库名 | default | 数据源下对应的数据库名信息。 |
Jdbc 链接参数 | {"serviceDiscoveryMode":"zooKeeper","zooKeeperNamespace":"midas/ha","auth":"LDAP"} | 输入 json 格式的连接参数,以 {"key1":"value1","key2":"value2"...} 格式输入,非必填。 |
以上使用的用户名密码可以从以下路径获取:
hive_admin
配置项内容;
hive_password
配置项内容。非 HA 集群配置示例如下:
配置项 | 示例 | 说明 |
---|---|---|
数据源 | spark | 下拉选择数据源类型,此处选择 spark 数据源类型。 |
数据源名称 | spark_data | 输入数据源名称信息。 |
描述 | 非 HA 集群配置 | 输入该数据源的描述信息,方便后续管理。 |
IP 主机名 | emr-master-1-1 | 输入 spark 数据源的 master 名称。 |
端口 | 10005 | 填写对应的端口号信息。 |
用户名 | hive | Spark 数据源下对应的用户名信息 |
密码 | 对应的密码信息。 | |
数据库名 | default | 数据源下对应的数据库名信息。 |
Jdbc 链接参数 | 输入 json 格式的连接参数,以 {"key1":"value1","key2":"value2"...} 格式输入,非必填。 |
拖动 SQL 任务类型到右边栏中,完成以下配置信息:
参数 | 说明 |
---|---|
节点名称 | 输入任务类型的节点名称信息。 |
运行标志 | 选择任务运行的状态,支持选择正常、禁止执行,若勾选“禁止执行”,运行工作流不会执行该任务。 |
描述 | 为当前节点填写任务描述信息,方便后续区分。 |
任务优先级 | 选择当前任务的优先级情况,从 HIGHEST 到 LOWSET 分为5个等级,您可根据实际情况进行选择,当 worker 线程数不足时,级别高的任务在执行队列中会优先执行,相同优先级的任务按照先进先出的顺序执行。 |
Worker 分组 | 下拉选择已创建成功的 Worker 分组信息。 |
环境名称 | 配置任务执行的环境,下拉选择已创建成功的环境名称。 |
任务组名称 | 通过任务组来管理任务实例占用的资源,避免占用太多资源导致了集群其他组件受到资源上的影响。您可在资源中心 > 任务组管理中,创建任务组信息。详见任务组管理。 |
组内优先级 | 设置该任务,在当前任务组内的优先级,在同个任务组内优先获得资源。 |
失败重试次数 | 任务失败重新提交的次数,可进行手动填充。 |
失败重试间隔 | 任务失败重新提交任务的时间间隔,可以进行手动填充。 |
延时执行时间 | 任务延迟执行的时间,以分为单位。 |
超时告警 | 设置超时告警、超时失败。当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。 |
数据源类型 | 下拉选择已创建成功的 Spark 数据源。 |
数据源实例 | 选择的测试连通性成功的 Spark 数据源实例信息。 |
SQL 类型---查询 |
|
SQL 类型---非查询 | 可执行多段SQL,并设置分段执行符号。 |
SQL 语句 | 输入 SQL 执行语句。 |
UDF 函数 | 对于 HIVE 类型的数据源,可以引用资源中心中创建的 UDF 函数,其他类型的数据源暂不支持UDF函数。 |
自定义参数 | SQL 任务类型,而存储过程是自定义参数顺序,给方法设置值自定义参数类型和数据类型,同存储过程任务类型一样。区别在于 SQL 任务类型自定义参数会替换 SQL 语句中${变量}。 |
前置 SQL 语句 | 前置 SQL 在 SQL 语句之前执行。 |
后置 SQL 语句 | 后置 SQL 在 SQL 语句之后执行。 |
前置任务 | 设置当前任务的前置(上游)任务,形成上下游依赖关系。 |
Spark SQL 与 Hive 一致,只需要在数据源选择的时候选择正确的类型以及实例即可。
SQL 语句示例:
-- SQL语句: create table if not exists dolphin_test_spark_x(id int);
作业配置完成后,您可执行以下步骤,执行配置的作业:
spark-sql show tables;
Flink 类型任务中统一包含了 SQL 与 JAR 两种提交方式。
工作流主要参数配置如下:
参数 | 示例值 | 说明 |
---|---|---|
程序类型 | JAVA | 支持选择 JAVA、SQL、SCALA、PYTHON 类型。 |
主函数的 Class | org.apache.flink.streaming.examples.wordcount.WordCount | 输入 Jar 包中的主函数 Class 信息。 |
主程序包 | Flink_test_jar.jar | 下拉选择已上传成功的 Jar 包资源。 |
部署方式 | pre-job/cluster | 支持 pre-job/cluster(Flink 版本大于 1.10 时可选)、application、local的部署方式 |
Flink 版本 |
| 支持选择 '<1.10'、'1.11'、'>=1.12' 三个 Flink 版本信息。 |
主程序参数 | -input hdfs://emr-master-1-1:8020/dolphinscheduler/项目名称/resources/wordcount文件.txt --output hdfs:/emr-master-1-1:8020/tmp/test_x | 根据实际的路径和文件名,填写主程序参数。 说明
|
资源 | wordcount文件.txt | 下拉选择已上传成功的文件信息。 |
运行 Flink SQL 任务只需要将程序类型选择为 SQL,并在脚本输入框中录入需要执行的 SQL 即可。关于 Flink 不同部署方式在 EMR 中的支持情况,请参阅 Flink 基础使用。
与 Flink JAR 提交的方式相同,Spark-submit 运行同样依赖通过资源中心上传的程序包和配套资源文件。
工作流主要参数配置如下:
参数 | 示例值 | 说明 |
---|---|---|
程序类型 | JAVA | 支持选择 JAVA、SQL、SCALA、PYTHON 类型。 |
Spark 版本 | SPARK2 | 根据实际场景,选择 Spark 的版本信息,支持选择 Spark1、Spark2。 |
主函数的 Class | org.apache.spark.examples.JavaWordCount | 输入 Jar 包中的主函数 Class 信息。 |
主程序包 | spark_test_jar.jar | 下拉选择已上传成功的 Jar 包资源。 |
部署方式 | cluster | 支持 cluster、client、local 的部署方式。 |
主程序参数 | /dolphinscheduelr/租户名/resources/资源中心文件夹名/文件名称 | 根据实际的租户名和文件名,填写主程序参数。 |
资源 | wordcount文件.txt | 下拉选择已上传成功的文件信息。 |
配置完成,并运行工作流任务成功后,您可登入到 YARN UI 界面,查看对应的执行结果: