Spark 是专为大规模数据分析处理而设计的开源分布式计算框架。本文介绍如何配置 EMR 中的 Spark 服务使用 CloudFS。
说明
集群所有节点都要修改如下配置。
wget https://cloudfs.tos-cn-beijing.volces.com/sdk/prod/cloudfs-client-1.5.3.43.tar.gz
tar -zxvf cloudfs-client-1.5.3.43.tar.gz
将解压文件cloudfs-client/lib/cloudfs-sdk-1.5.3.43.jar
拷贝至集群/share/hadoop/hdfs
路径下。
cp {YOUR_DOWNLOAD_PATH}/cloudfs-client/lib/cloudfs-sdk-1.5.3.43.jar /opt/{Directory}/hadoop/share/hadoop/hdfs/
配置core-site.xml
文件。
core-site.xml
文件:vim {hadoop_安装目录}/hadoop/conf/core-site.xml
core-site.xml
中添加配置:<property> <name>fs.defaultFS</name> <value>cfs://xxxx.cfs-cn-beijing.ivolces.com</value> <!-- 填写获取的挂载点地址 --> </property> <property> <name>fs.cfs.impl</name> <value>com.volcengine.cloudfs.fs.CfsFileSystem</value> </property> <property> <name>fs.AbstractFileSystem.cfs.impl</name> <value>com.volcengine.cloudfs.fs.CFS</value> </property> <property> <name>cfs.access.key</name> <value>AKxxxxxxxxxxx</value> <!-- 填写访问密钥ID --> </property> <property> <name>cfs.secret.key</name> <value>SKxxxxxxxxxxx</value> <!-- 填写私有访问密钥--> </property> <!-- 可选:如果使用的是 STS Token,需要填写 --> <property> <name>cfs.security.token</name> <value>STSTokenxxxxxxxx</value> </property> <!-- 可选:如果开启缓存加速,需要配置缓存加速接入的 VPC 的网段 --> <property> <name>cfs.client.network.segment</name> <value><VPC 网段,例如 192.168.0.0/16></value> </property>
将解压后的 SDK 目录下的cloudfs-client/lib/cloudfs-sdk-1.5.3.43.jar
文件复制到 Spark 的/{Directory}/spark/jars/
目录下。
cp {YOUR_DOWNLOAD_PATH}/cloudfs-client/lib/cloudfs-sdk-1.5.3.43.jar /{Directory}/spark/jars/
重启 Spark 服务。
使用 Spark 处理 CloudFS 中的数据有两种方式。具体步骤如下:
使用Spark-sql
处理 CloudFS 的数据。
使Spark-sql
处理 CloudFS 中的数据的方式和 Hivesql 类似,只需要建表的时候指定 locatition
至 CloudFS 文件系统即可。详细操作,可参见使用 Hive 处理 CloudFS 中的数据。
使用Spark-shell
处理 CloudFS 的数据。具体操作步骤如下:
bin
目录下,执行以下命令进去界面。./spark-shell
如果返回以下信息,则表示执行成功。
2. 查询 CloudFS 中的数据。
读取 CloudFS 路径下已有的文件。
val a =sc.textFile("cfs://{Directory}"); <!-- 从CloudFS路径下读取已有的文件 --> a.count <!-- 计算读取到的文件 -->
写入文件到指定的 CloudFS 路径下。
a.saveAsTextFile("cfs://{Directory}")