DistCp是用于大规模集群间/集群内数据拷贝的工具,实现了不同文件系统间(主要是HDFS)的数据拷贝,
本文将介绍如何基于Proton使用DistCp从HDFS中将数据拷贝到TOS。
参考:Proton 无缓存模式
# 构造 DistCp命令将HDFS数据拷贝到TOS >hadoop distcp \ -Dmapreduce.job.queuename=default \ -Dmapreduce.job.name=test-distcp \ -i \ -update \ -delete \ -m 50 \ -bandwidth 50 \ -numListstatusThreads=40 \ -skipcrccheck \ -direct \ hdfs://source_hdfs:8020/source_path \ tos://bucket-name/my_target_path
注意
由于HDFS的文件校验方式和TOS不一致,需要使用-skipcrccheck参数跳过校验值检查,数据存在不一致风险。
在将数据通过distcp工具导入到 TOS 对象存储服务时,建议开启 -direct 开关。因为该开关可以避免distcp将源端数据copy 到临时文件,再通过rename操作将临时文件copy 到目的文件中。开启-direct可以将对象存储的网络带宽占用降低一倍。
参数 | 说明 |
---|---|
-m | 最大的拷贝并行度,对应的是DistCp作业中最大的map数量 |
-bandwidth | 每个map能使用的最大带宽,单位是MB/s。通过-m和-bandwidth参数来控制distcp作业的拷贝限速,速度上限=map数量 * bandwidth |
-numListstatusThreads | 从namenode中获取元数据信息的线程数 |
-update | 把src存在target不存在 & src与target都存在但内容不同的文件拷贝到target |
-delete | src不存在但target存在的文件,从target删除 |
-useiterator | 使用iterator来构建CopyListing,避免OOM |
-i | 忽略DistCp过程中报错 |
-direct | Distcp默认会先写一个临时文件再rename为target文件,使用-direct后会直接将数据写到target |
-strategy | -strategy=uniform:Distcp默认采用uniform方式,即在拷贝前根据数据集大小,为每个maptask划分尽量平均的拷贝数据集 -strategy=dynamic:会将数据集划分为多个小chunk,每个map task会尽力去消费chunk,可以优化长尾情况。长尾原因:
|
-atomic | 会把所有文件拷贝到一个临时目录,待全部拷贝完成后统一rename到target下,在TOS场景下不要使用。 |
通过设置-strategy=dynamic优化长尾问题。设拷贝总文件数=TOTAL_FILE,map数=TOTAL_MAP,单chunk文件数100(可根据实际情况自行评估)。
单个map需要处理的文件数=AVERAGE_MAP_FILE=ceiling(TOTAL_FILE/TOTAL_MAP)。
单个map需要处理的chunk数=ceiling(AVERAGE_MAP_FILE/100)。配置为mapred.listing.split.ratio。
总chunk数=map数*单map处理chunk数=TOTAL_MAP,配置为distcp.dynamic.max.chunks.tolerable。
eg 1. 总文件数500w,map数150。 2. 单个map处理records数=500w/150=33334(向上取整),单chunk文件数设100是合适的。 3. 单map需要处理的chunk数=33334/100=334(向上取整)。 4. 总chunk数=150 * 334=50100。 5. 配置如下 mapred.listing.split.ratio=334 distcp.dynamic.max.chunks.tolerable=50100
在针对源端目录会持续变化,可以通过snapshot的方式去获取一个静态的源端数据用于DistCp。这边建议对1级或者2级目录进行snapshot,不要对根目录进行snapshot
# 允许使用snapshot >hdfs dfsadmin -allowSnapshot /source_path # 创建snapshot >hdfs dfs -createSnapshot /source_path source_path_snapshot_name # 上述Distcp命令可以变成 >hadoop distcp \ -Dmapreduce.job.queuename=default \ -Dmapreduce.job.name=test-distcp \ -i \ -update \ -delete \ -m 50 \ -bandwidth 50 \ -numListstatusThreads=40 \ -skipcrccheck \ -strategy dynamic \ -direct \ hdfs://source_hdfs:8020/source_path/.snapshot/source_path_snapshot_name \ tos://bucket-name/target_path
当使用strategy=dynamic时遇到如下报错:
java.io.IOException: Too many chunks created with splitRatio:2, numMaps:250. Reduce numMaps or decrease split-ratio to proceed. at org.apache.hadoop.tools.mapred.lib.DynamicInputFormat.validateNumChunksUsing(DynamicInputFormat.java:185)
可以调大distcp.dynamic.max.chunks.tolerable
的值。eg. 在Distcp 命令中增加-Ddistcp.dynamic.max.chunks.tolerable=10000