Flink 是一个面向有限流和无限流有状态计算的分布式计算框架,它能够支持流处理和批处理两种应用类型。
本文介绍如何配置 EMR 中的 Flink 服务使用和访问 CloudFS。
说明
集群所有节点都要修改如下配置。
wget https://cloudfs.tos-cn-beijing.volces.com/sdk/prod/cloudfs-client-1.5.3.43.tar.gz
tar -zxvf cloudfs-client-1.5.3.43.tar.gz
将解压文件cloudfs-client/lib/cloudfs-sdk-1.5.3.43.jar
拷贝至集群/share/hadoop/hdfs
路径下。
cp {YOUR_DOWNLOAD_PATH}/cloudfs-client/lib/cloudfs-sdk-1.5.3.43.jar /opt/{Directory}/hadoop/share/hadoop/hdfs/
配置core-site.xml
文件。
core-site.xml
文件:vim {hadoop_安装目录}/hadoop/conf/core-site.xml
core-site.xml
中添加配置:<property> <name>fs.defaultFS</name> <value>cfs://xxxx.cfs-cn-beijing.ivolces.com</value> <!-- 填写获取的挂载点地址 --> </property> <property> <name>fs.cfs.impl</name> <value>com.volcengine.cloudfs.fs.CfsFileSystem</value> </property> <property> <name>fs.AbstractFileSystem.cfs.impl</name> <value>com.volcengine.cloudfs.fs.CFS</value> </property> <property> <name>cfs.access.key</name> <value>AKxxxxxxxxxxx</value> <!-- 填写访问密钥ID --> </property> <property> <name>cfs.secret.key</name> <value>SKxxxxxxxxxxx</value> <!-- 填写私有访问密钥--> </property> <!-- 可选:如果使用的是 STS Token,需要填写 --> <property> <name>cfs.security.token</name> <value>STSTokenxxxxxxxx</value> </property> <!-- 可选:如果开启缓存加速,需要配置缓存加速接入的 VPC 的网段 --> <property> <name>cfs.client.network.segment</name> <value><VPC 网段,例如 192.168.0.0/16></value> </property>
将解压后的 SDK 目录下的cloudfs-client/lib/cloudfs-sdk-1.5.3.43.jar
文件复制到Flink lib
文件夹下。
cp {YOUR_DOWNLOAD_PATH}/cloudfs-client/lib/cloudfs-sdk-1.5.3.43.jar /{Directory}/flink/lib/
重启 Flink 服务。
说明
需自行上传测试数据。
准备测试数据。
hdfs dfs -ls cfs://`{`Directory`}/`{document} <!-- 查看上传目录下的文件 -->
执行 Flink 计算命令。
export HADOOP_CONF_DIR=/`{`Directory`}`/hadoop/conf <!-- 该命令为指定环境使用于你的hadoop下的conf文件夹 --> flink run -t local /`{`Directory`}`/flink/examples/batch/WordCount.jar \ ---input cfs://`{`Directory`}`/{document} \ <!-- 此处文件夹下的数据作为输入 --> --output cfs://`{`Directory`}`/{document} <!-- 此处为经过WordCount计算后的输出文件 -->
查看计算结果。
hdfs dfs -cat cfs://`{`Directory`}`/{document} <!-- 此处为计算后的文件路径 -->
返回如下图类似信息,则表示配置 Flink 成功。