You need to enable JavaScript to run this app.
导航
集成示例
最近更新时间:2024.07.09 17:47:11首次发布时间:2023.02.23 11:06:57

本文通过演示在火山引擎 E-MapReduce(EMR)的 DolphinScheduler 中运行 Spark、Flink 任务的示例,帮助您更好地理解 DolphinScheduler 的使用。

1 使用前提

  1. 已创建包含 DolphinScheduler 组件的 EMR 集群。详见创建集群
  2. DolphinScheduler 组件服务,需要为 emr-master-1-1 机器实例绑定弹性公网 IP,并配置服务端口(DolphinScheduler 默认的服务端口是 12345)才能访问 DolphinScheduler Web UI,详见访问链接

2 资源中心

在 DolphinScheduler 中,资源中心通常用于上传文件、UDF 函数和任务组管理。 EMR DolphinScheduler 中的资源中心基于同集群内的 Hadoop 集群,默认目录是:/dolphinscheduler,由配置项 resource_upload_path 定义。关于如何修改服务配置参数,请参阅管理服务配置参数
本示例中,我们会运行一个经典的 WordCount 程序,需要先将所需的 spark jar、flink jar 与 word 文本文件(见下文)上传到资源中心,然后在后续定义具体工作流时进行引用。

  • Spark jar:
    spark_test_jar.jar
    1.49MB
  • Flink jar:
    Flink_test_jar.jar
    14.32KB
  • Word 文本信息
    wordcount文件.txt
    1.15KB

上传操作如下:

  1. 登录 EMR 控制台
  2. 在左侧导航栏中,单击集群管理 > 集群列表 > DolphinScheduler 集群详情 > 访问链接 > DolphinScheduler UI 访问链接, 进入 Web UI 登录界面。
  3. 输入对应的用户名和密码信息,确认后进入 Web UI 界面。创建用户请参阅快速开始---创建用户
  4. 在上方导航栏中,单击资源中心按钮,进入资源文件夹管理界面。
  5. 文件管理界面,单击上传文件按钮,从本地选择对应文件,单击确定按钮,完成资源上传。
    图片

3 数据源中心

在运行 Spark SQL、Hive SQL 类型任务时,DolphinScheduler 要求在数据源中心中预先配置好数据源连接信息。这里以 EMR 3.x 版本的 Hadoop 类型集群中配置一个 Spark 数据源为例。

3.1 HA 集群

HA集群配置示例如下:

  1. 单击界面上方导航栏中的数据源中心按钮,进入数据源中心界面。

  2. 单击创建数据源,配置以下数据源信息:

    配置项

    示例

    说明

    数据源

    spark

    下拉选择数据源类型,此处选择 spark 数据源类型。

    数据源名称

    spark_source

    输入数据源名称信息。

    描述

    HA 集群配置

    输入该数据源的描述信息,方便后续管理。

    IP 主机名

    emr-master-1-1,emr-master-2,emr-master-3

    输入 spark 数据源的 master 名称。

    端口

    2181

    填写对应的端口号信息。

    用户名

    hive

    Spark 数据源下对应的用户名信息

    密码


    对应的密码信息。

    数据库名

    default

    数据源下对应的数据库名信息。

    Jdbc 链接参数

    {"serviceDiscoveryMode":"zooKeeper","zooKeeperNamespace":"midas/ha","auth":"LDAP"}

    输入 json 格式的连接参数,以 {"key1":"value1","key2":"value2"...} 格式输入,非必填。

  3. 以上使用的用户名密码可以从以下路径获取:

    1. 进入 EMR 控制台 > 集群管理 > 集群列表 > DolphinScheduler 集群详情 > 服务列表 > OpenLDAP > 服务参数界面。
    2. 获取 Hive 服务参数名称的管理员账号信息:
      • 账号:hive_admin 配置项内容;
        • 密码:hive_password配置项内容。
          图片

3.2 非 HA 集群

非 HA 集群配置示例如下:

配置项

示例

说明

数据源

spark

下拉选择数据源类型,此处选择 spark 数据源类型。

数据源名称

spark_data

输入数据源名称信息。

描述

非 HA 集群配置

输入该数据源的描述信息,方便后续管理。

IP 主机名

emr-master-1-1

输入 spark 数据源的 master 名称。

端口

10005

填写对应的端口号信息。

用户名

hive

Spark 数据源下对应的用户名信息

密码


对应的密码信息。

数据库名

default

数据源下对应的数据库名信息。

Jdbc 链接参数

输入 json 格式的连接参数,以 {"key1":"value1","key2":"value2"...} 格式输入,非必填。

4 工作流定义

4.1 使用前提

  1. 已创建相关 DolphinScheduler 项目、用户信息及队列信息。详见快速开始

4.2 操作步骤

  1. 单击界面上方项目管理按钮,进入项目列表界面。
  2. 单击详细项目名称,进入到项目概览界面。
  3. 在左侧导航栏中,单击工作流 > 工作流定义按钮,进入工作流定义页面。
  4. 单击创建工作流按钮,进入工作流 DAG 编辑页面,拖动相应的任务类型到右边栏中,新增对应任务。

4.3 Hive / Spark SQL

4.3.1 配置作业

拖动 SQL 任务类型到右边栏中,完成以下配置信息:

参数

说明

节点名称

输入任务类型的节点名称信息。

运行标志

选择任务运行的状态,支持选择正常、禁止执行,若勾选“禁止执行”,运行工作流不会执行该任务。

描述

为当前节点填写任务描述信息,方便后续区分。

任务优先级

选择当前任务的优先级情况,从 HIGHEST 到 LOWSET 分为5个等级,您可根据实际情况进行选择,当 worker 线程数不足时,级别高的任务在执行队列中会优先执行,相同优先级的任务按照先进先出的顺序执行。

Worker 分组

下拉选择已创建成功的 Worker 分组信息。

环境名称

配置任务执行的环境,下拉选择已创建成功的环境名称。

任务组名称

通过任务组来管理任务实例占用的资源,避免占用太多资源导致了集群其他组件受到资源上的影响。您可在资源中心 > 任务组管理中,创建任务组信息。详见任务组管理

组内优先级

设置该任务,在当前任务组内的优先级,在同个任务组内优先获得资源。

失败重试次数

任务失败重新提交的次数,可进行手动填充。

失败重试间隔

任务失败重新提交任务的时间间隔,可以进行手动填充。

延时执行时间

任务延迟执行的时间,以分为单位。

超时告警

设置超时告警、超时失败。当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。

数据源类型

下拉选择已创建成功的 Spark 数据源。

数据源实例

选择的测试连通性成功的 Spark 数据源实例信息。

SQL 类型---查询

  • SQL 查询类型,您可以选择是否开启发送邮件告警:
    • 主题:告警邮件发送的主题信息。
    • 告警组:选择对应的告警组信息,用于邮件接收。配置详见快速开始
  • 日志显示:指定日志中展示的查询结果行数。

SQL 类型---非查询

可执行多段SQL,并设置分段执行符号。

SQL 语句

输入 SQL 执行语句。

UDF 函数

对于 HIVE 类型的数据源,可以引用资源中心中创建的 UDF 函数,其他类型的数据源暂不支持UDF函数。

自定义参数

SQL 任务类型,而存储过程是自定义参数顺序,给方法设置值自定义参数类型和数据类型,同存储过程任务类型一样。区别在于 SQL 任务类型自定义参数会替换 SQL 语句中${变量}。

前置 SQL 语句

前置 SQL 在 SQL 语句之前执行。

后置 SQL 语句

后置 SQL 在 SQL 语句之后执行。

前置任务

设置当前任务的前置(上游)任务,形成上下游依赖关系。

Spark SQL 与 Hive 一致,只需要在数据源选择的时候选择正确的类型以及实例即可。
SQL 语句示例:

-- SQL语句:
create table if not exists dolphin_test_spark_x(id int);

4.3.2 运行作业

作业配置完成后,您可执行以下步骤,执行配置的作业:

  1. 在编辑界面,右上角单击保存按钮,保存工作流的配置。
  2. 回到工作流定义列表界面,单击操作列中的上线按钮,完成作业发布。
  3. 单击运行运行按钮,工作流生成相应的工作流实例,您可以进入工作流实例界面,进行查看具体实例执行情况。
    图片
  4. 待工作流实例运行完成后,登录到 DolphinScheduler 集群的实例节点,执行如下命令:
    登入方式详见创建租户-集群远程登录
spark-sql

show tables;

图片

Flink 类型任务中统一包含了 SQL 与 JAR 两种提交方式。

4.4.1 JAR 包提交

工作流主要参数配置如下:

参数

示例值

说明

程序类型

JAVA

支持选择 JAVA、SQL、SCALA、PYTHON 类型。

主函数的 Class

org.apache.flink.streaming.examples.wordcount.WordCount

输入 Jar 包中的主函数 Class 信息。

主程序包

Flink_test_jar.jar

下拉选择已上传成功的 Jar 包资源。

部署方式

pre-job/cluster

支持 pre-job/cluster(Flink 版本大于 1.10 时可选)、application、local的部署方式

Flink 版本

=1.12

支持选择 '<1.10'、'1.11'、'>=1.12' 三个 Flink 版本信息。

主程序参数

-input hdfs://emr-master-1-1:8020/dolphinscheduler/项目名称/resources/wordcount文件.txt --output hdfs:/emr-master-1-1:8020/tmp/test_x

根据实际的路径和文件名,填写主程序参数。

说明

  • ha集群用hdfs://emr-cluster/...
  • 本示例运行一个 WordCount 程序,该程序要求有一个输入的文本文件,并将结果输出到指定位置,这里我们通过主程序参数中的 --input--output 予以指定。

资源

wordcount文件.txt

下拉选择已上传成功的文件信息。

4.4.2 SQL

运行 Flink SQL 任务只需要将程序类型选择为 SQL,并在脚本输入框中录入需要执行的 SQL 即可。关于 Flink 不同部署方式在 EMR 中的支持情况,请参阅 Flink 基础使用

4.5 Spark-submit

与 Flink JAR 提交的方式相同,Spark-submit 运行同样依赖通过资源中心上传的程序包和配套资源文件。
工作流主要参数配置如下:

参数

示例值

说明

程序类型

JAVA

支持选择 JAVA、SQL、SCALA、PYTHON 类型。

Spark 版本

SPARK2

根据实际场景,选择 Spark 的版本信息,支持选择 Spark1、Spark2。

主函数的 Class

org.apache.spark.examples.JavaWordCount

输入 Jar 包中的主函数 Class 信息。

主程序包

spark_test_jar.jar

下拉选择已上传成功的 Jar 包资源。

部署方式

cluster

支持 cluster、client、local 的部署方式。

主程序参数

/dolphinscheduelr/租户名/resources/资源中心文件夹名/文件名称

根据实际的租户名和文件名,填写主程序参数。

资源

wordcount文件.txt

下拉选择已上传成功的文件信息。

配置完成,并运行工作流任务成功后,您可登入到 YARN UI 界面,查看对应的执行结果:

  1. 进入集群管理 > 集群列表 > DolphinScheduler 集群详情 > 服务列表 > OpenLDAP > 服务参数界面。
  2. 获取登录 YARN UI 的用户名和密码信息。
  3. 进入DolphinScheduler 集群详情 > 访问链接, 单击 YARN ResourceManager UI 访问链接,并登录。
  4. 在对应的 spark 作业 ID 下,查看最终的执行日志结果。