安装后默认已经预置了部分用户的权限,如已经预置 hive 用户的权限,如需添加新的用户和新的权限,可以在 Ranger Admin 界面添加新的权限 Policy,详细可以参考 Ranger 帮助文档下 Spark集成 章节。
说明
在 EMR-3.4.0 及以后的版本中,将下线 Ksana 组件相关功能;
在 EMR-3.3.0 及之前的版本中,仍保留 Ksana 组件相关功能,您可创建 EMR-3.3.0 及之前的集群版本,来使用 Ksana 功能。
Hudi可通过创建连接的时候指定Hudi的参数,该方式针对当前连接生效:
beeline --hiveconf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --hiveconf spark.serializer=org.apache.spark.serializer.KryoSerializer --hiveconf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog -u "jdbc:hive2://emr-master-1:10005/default;auth=LDAP" -n <user> -p <password>
创建Hudi表:
create table hudi_mor_tbl ( id int, name string, price double, ts bigint ) using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'ts' );
插入数据:
insert into hudi_mor_tbl_1(id, name, price, ts)values(1, 'test', 1, 1);
查询结果:
select * from hudi_mor_tbl; [pool-30-thread-5] INFO com.bytedance.emr.midas.engine.spark.operation.SparkOperation - Processing EXECUTE_STATEMENT statement: EXECUTE_STATEMENT , time taken: 0.471 seconds +----------------------+------------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------+--------+-----+ | _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | +----------------------+------------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------+--------+-----+ | 20220429103652951 | 20220429103652951_0_1 | id:1 | | a4cadbf7-973c-44c1-b54c-1b07222e6040-0_0-2.parquet | 1 | test | 1.0 | 1 | +----------------------+------------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------+--------+-----+ 1 row selected (2.077 seconds)
Iceberg可通过创建连接的时候指定Hudi的参数,该方式针对当前连接生效:
beeline --hiveconf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog --hiveconf spark.sql.catalog.iceberg.type=hive --hiveconf spark.sql.catalog.iceberg.uri=thrift://emr-master-1:9083,thrift://emr-master-2:9083,thrift://emr-master-3:9083 --hiveconf spark.sql.catalog.iceberg.warehouse=hdfs://emr-master-1:8020/user/hive/warehouse/iceberg/hive --hiveconf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -u "jdbc:hive2://emr-master-1:10005/default;auth=LDAP" -n <user> -p <password>
Ksana for SparkSQL 是提前在 Yarn 上提交 Spark 引擎,进行 SQL 的接收,返回执行结果,可以在提交作业的时候,指定参数的信息,也可以修改组件参数配置,设置默认资源,修改方式为:
集群详情 > 服务列表 > Spark > 服务参数
搜索如下参数,进行默认值的修改:
参数名称 | 说明 |
---|---|
spark.master | 作业的提交方式,默认为 yarn |
spark.driver-memory | driver 端默认内存为 1g |
spark.driver-cores | driver 默认 core 数量为 1 |
spark.executor-cores | executor 默认 core 数量为 1 |
spark.num-executors | 默认 executor 数量为 1 |
spark.executor-memory | 默认 executor 内存为 1g |
spark.queue | 作业提交队列,默认为 default |
除了通过配置文件修改全局的默认参数外,也可在创建连接的时候动态的对某个参数进行覆盖,参数生效范围为本次连接,例如可通过:
./bin/beeline --hiveconf spark.queue=队列名称
进行默认队列的修改,其余参数均可通过该方式进行设置,此外 Ksana for SparkSQL 兼容所有原生 Spark 参数,可以通过以下路径进行设置:
集群详情 > 服务列表 > Spark > 服务参数 > 自定义参数
关于 Spark 原生参数,请查看:https://spark.apache.org/docs/latest/configuration.html#overriding-configuration-directory
Ksana for SparkSQL 支持 Session,User,Open 三种不同的资源隔离级别,具体的定义如下:
Session:
Session 级别下,Ksana for SparkSQL 会为每一次 connection 创建一个 Spark 引擎,该 connection 下的所有 SQL 作业均由该 Spark 引擎进行执行,connection 关闭后,该 Spark 引擎会被停止。
对于资源隔离要求比较高,或者对资源独占有较高诉求的作业,可以选择此种模式。
User:
User 级别下,Ksana for SparkSQL 会为每一个用户创建一个 Spark 引擎,该用户的所有 SQL 作业均会由该 Spark 引擎进行执行,connection 关闭后,该 Spark 引擎不会停止,在指定阈值周期内,没有新的用户作业提交,该引擎会自动停止。
对于需要按照用户级别进行作业提交的场景,可以选择此种模式。
Open:
Open 模式下,所有的用户的 SQL 作业会提交至同一个 Spark 引擎。
对于没有严格的账号权限控制,或者希望共享资源信息的情况下,可以选择此种模式。
不同隔离级别的使用方式为:
建立连接的时候通过设置 midas.engine.isolation 为 SESSION/USER/OPEN,用来声明本次连接是什么隔离级别模式,例如:
./bin/beeline --hiveconf midas.engine.isolation=SESSION
以上命令启动一个 SESSION 模式的 Spark 引擎执行作业。由于 Ksana for SparkSQL 本质上是在 Yarn 上启动一个常驻的 Spark 作业,当资源隔离级别为 USER 或者 OPEN 的时候,如果该 Spark 作业在一定时间内没有接收到 SQL 作业,则该 Spark 作业会从 Yarn 上被停止,默认的空闲周期为1小时,用户可选择使用 ssh 登陆安装了 Ksana for SparkSQL 的机器,编辑如下文件/usr/lib/emr/current/midas-dist/config/midas-env.sh
对如下值进行修改:
export KSANA_ENGINE_TIME_OUT=3600000 export KSANA_SESSION_TIME_OUT=3600000
该参数默认单位为毫秒,修改完成后,需要重启 Ksana for SparkSQL。
引擎预热为 Yarn 会长期保持多个活跃的 Spark 引擎,处于等待接收 SparkSQL 引擎的状态,降低作业第一次执行的时候冷启动带来的等待时间,引擎预热功能默认处于关闭状态,若要使用需要先打开引擎预热,打开方式为:
集群详情 > 服务列表 > Spark > 服务参数
修改 midas.prepare.engine 的值为 true,且对引擎参数进行配置,相关的参数如下:
参数名称 | 说明 |
---|---|
midas.prepare.engine | 引擎预热功能,默认为 false 不开启。 |
midas.prepare.engine.maximum | 最大引擎个数,默认为 3。 |
midas.prepare.engine.minimum | 最小引擎个数,默认为 1。 |
midas.prepare.engine.username | 提交 Spark 引擎的用户,默认为内置的 hive 用户。 |
midas.prepare.engine.level | 资源隔离级别,默认为 USER 级别。 |
midas.prepare.engine.queue | 引擎队列,默认为 default。 |
midas.prepare.engine.driverMemory | driver 内存资源,默认 1G。 |
midas.prepare.engine.driverCores | driver core 数量,默认 1。 |
midas.prepare.engine.executorCores | executor core 数量,默认 1。 |
midas.prepare.engine.numExecutor | executor 数量,默认 1。 |
midas.prepare.engine.executorMemory | executor 内存资源,默认 1G。 |
Ksana for SparkSQL 会基于当前集群资源,基于 midas.prepare.engine.maximum 和 midas.prepare.engine.minimum 的值计算出合适的作业个数,提前在 Yarn 上运行。
Ksana for SparkSQL 默认关闭 Spark 动态资源管理,若需要开启此功能,您可以按以下方法进行配置:
集群详情 > 服务列表 > Spark > 服务参数
修改如下参数:
参数名称 | 说明 |
---|---|
spark.dynamicAllocation.enabled | 动态资源开关,将其设置为 true,默认 false。 |
spark.shuffle.service.enabled | 动态资源开关,将其设置为 true,默认 false。 |
spark.dynamicAllocation.minExecutors | 最小资源数量,默认为 1。 |
即可开启 Spark 动态资源管理。
Ksana for SparkSQL 本身为 Spark 引擎运行 SparkSQL 作业,因此可以在 Yarn 页面上查看对应的作业运行情况,同时也可在机器内查看到对应的日志,查看方式为:
在 集群管理 > 集群列表 > 具体集群名称, 进入 集群详情 界面.
导航栏中点击 服务列表,点击 Spark 服务并进入。
点击 emr 集群节点 (emr-master-1 主机名称)的 ECS ID,跳转进入到云服务器的实例界面,点击右上角的 远程连接 按钮,输入集群创建时的 root 密码,并进入远程终端。
进入 /usr/lib/emr/current/midas-dist/workers 目录,Ksana for SparkSQL 默认按照提交用户进行归类,可在此目录下查看具体 SQL 的执行日志。
Ksana for SparkSQL 默认保留7天的日志,若需要保留更长时间的日志,进入:
集群详情 > 服务列表 > Spark > 服务参数
修改以下参数信息:
参数名称 | 说明 |
---|---|
midas.log.timeout.period | 日志保存周期,默认为 7 天,1000 * 60 * 60 * 24 * 7。 |
midas.log.archive.size | 日志文件归档大小,默认 1GB。 |
可设置为业务需要的过期时间。
Ksana for SparkSQL 兼容 Hive Driver,因此可以使用 Hive 驱动,修改连接端口即可执行 SQL 作业,以 Java 代码连接 Ksana for SparkSQL 为例:
引入相关驱动:
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>您的Hive版本</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>您的hadoop版本</version> </dependency>
使用如下代码执行SparkSQL作业:
public static void main(String[] args) throws SQLException { Connection connection = null; try { Class.forName("org.apache.hive.jdbc.HiveDriver"); Properties properties = new Properties(); properties.put("user", "您的用户名"); properties.put("password", "您的密码"); connection = DriverManager.getConnection("您的Ksana for SparkSQL连接地址", properties); HiveStatement hiveStatement = (HiveStatement) connection.createStatement(); ResultSet rs = hiveStatement.executeQuery("SLQ作业"); while (rs.next()) { //todo 输出结果 } for (String log: hiveStatement.getQueryLog()) { //todo 可获取日志 } } catch (SQLException | ClassNotFoundException e) { e.printStackTrace(); } finally { assert connection != null; connection.close(); } }
可在 properties 对象中,基于业务需要调整相关参数,例如队列,内存等。