火山引擎 E-MapReduce(EMR)认证配置部分详见:Hadoop 使用 Proton - 认证配置。
默认情况下,只要是同一用户开通的 EMR 和 TOS 服务,在 EMR 集群上,则无需手动配置访问 TOS。否则,您需要进行手动 AK/SK 配置。
目前 EMR 集群已经为您配置好了 HDFS 相关配置,您无需额外配置。
目前 EMR 集群已经为您配置好了 Spark 相关配置,您无需额外配置。
请参考 Hadoop 使用 Proton - 认证配置 部分进行认证配置。
plugins/hadoop${hadoop.major.version}/proton-hadoop${hadoop.major.version}-bundle-{proton.version}.jar
置于如下两个目录。其中 hadoop.major.version
取值 2
或 3
,对应 hadoop 2.x 或者 hadoop 3.x,proton.version
是 Proton 版本。
$HADOOP_HOME/share/hadoop/hdfs/
$SPARK_HOME/jars/
配置 hadoop 的 core-site.xml
,使得 HDFS 客户端访问 TOS 时使用 Proton 的 FileSystem:
<configuration> <property> <name>fs.tos.access-key-id</name> <value>{iam.role.access.key}</value> <!--TOS 认证配置:EMR 集群可以不用配置此项--> </property> <property> <name>fs.tos.secret-access-key</name> <value>{iam.role.access.val}</value> <!--TOS 认证配置:EMR 集群可以不用配置此项--> </property> <property> <name>fs.tos.impl</name> <value>io.proton.fs.ProtonFileSystem</value> </property> <property> <name>fs.tos.endpoint</name> <value>http://tos-cn-{region}.volces.com</value> <!--如果是在火山 ECS 环境(包括火山 EMR),可以使用内网 endpoint:http://tos-cn-{region}.ivolces.com--> </property> </configuration>
下载安装基于 proton 的 spark committer
首先将 proton-spark-3.5.1-2.2.jar (对应 spark-3.5.1 版本)下载,放置在每个节点 ${SPARK_HOME}/jars
下面。如果配置了 spark.yarn.archive
参数,那么也要确保该 archive 中包含此 jar。
对于其他版本的 committer jar,可以从 JobCommitter版本获取。
配置spark
配置 Spark 的 spark-default.conf
,使得 Spark 提交作业时使用 Proton 的 JobCommitter:
spark.sql.extensions io.proton.spark.ProtonSparkSessionExtension
spark.hadoop.mapreduce.outputcommitter.class io.proton.commit.Committer spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.EMROutputCommitProtocol
注意
对于自建集群,确保 Spark 可以加载到 Hadoop 的 core-site.xml
,这一点可以通过在 spark-env.sh
中设置 export HADOOP_CONF_DIR=/your/hadoop/conf/dir
来解决。
CREATE TABLE testProton(id INT, name STRING) USING parquet LOCATION "tos://<your_bucket>/path/to/table";
INSERT INTO testProton VALUES (1, 'zhangsan'), (2, 'lisi');
_SUCCESS
文件,下载并打开该文件 ,如果有如下字样,则表明使用了 Proton committer:{ "name" : "io.proton.commit.SuccessData", "success" : true, "timestamp" : 1679043015429, "date" : null, "hostname" : "xdesktop/127.0.0.1", "committer" : "io.proton.commit.Committer", "description" : "Task committer attempt_20230317165012_0000_m_000000_0", "jobId" : "job_25d10e1f-e315-4403-8acc-7adada1f1ed0", "filenames" : [ "spark-tables/1679042998827/testProton/part-00000-f9917923-a220-49fb-a2dd-626535a7a6d7-c000.snappy.parquet" ], "diagnostics" : { "fs.job.committer.threads" : "10" } }