在使用 HDFS API 进行开发操作之前,必须要对 HDFS 进行初始化,HDFS 的初始化一般有两种形式:
直接加载 HDFS 集群中的配置文件,比如:core-site.xml 和 hdfs-site.xml 两个文件;
使用 conf 对象提供的一些方法去手动加载hdfs集群信息,比如集群名、副本数等。
在此处我们只介绍第一种方式:
Configuration conf = new Configuration();//初始化conf变量 private void init() throws IOException { conf = new Configuration(); //假设 以下两个配置文件均以拷贝到conf下 conf.addResource("conf/hdfs-site.xml"); conf.addResource("conf/core-site.xml"); fSystem = FileSystem.get(conf); }
初始化完 HDFS 文件系统后,我们就可以使用 HDFS 提供的 API 进行相应的开发。
主要用到 hadoop-common、hadoop-hdfs、hadoop-client 三个依赖包。
private static void uploadTest() throws Exception{ //其中local_src.txt是本地文件,hdfs_dst.txt是上传到hdfs后的文件 fSystem.copyFromLocalFile(new Path("local_src.txt"), new Path("hdfs_dst.txt")); fSystem.close(); }
private static void downloadTest() throws Exception{ //其中hdfs_src.txt是hdfs上的文件,local_dst.txt是从hdfs上下载的文件 fSystem.copyToLocalFile(new Path("hdfs_src.txt"), new Path("local_dst.txt")); fSystem.close(); }
上传和下载的 API 的底层封装其实就是 : FileUtil.copy(....)
当然,我们也可以使用Stream方式上传、下载文件:
private static void uploadTest() throws Exception{ //local.gz是本地文件系统上的文件,hdfs.gz是hdfs上的文件 InputStream in = new FileInputStream(new File("local.gz")); FSDataOutputStream out = fSystem.create(new Path("/hdfs.gz")); IOUtils.copyBytes(in, out, 4096, true); fSystem.close(); }
private static void downloadTest() throws Exception{ //其中hdfs.gz是hdfs上的文件,local.gz是下载到本地文件系统中的文件 FSDataInputStream in = fs.open(new Path("/hdfs.gz")); OutputStream out = new FileOutputStream(new File("local.gz")); IOUtils.copyBytes(in, out, 4096, true); fSystem.close(); }
private void createFileAndWrite() throws IOException { //将content中的内容写入到hdfs文件Write.txt中 final String content = "So well!"; FSDataOutputStream out = null; try { out = fSystem.create(new Path("/user/Write.txt")); out.write(content.getBytes()); out.hsync(); } finally { // make sure the stream is closed finally. out.close(); } }
private void appendFileContents() throws IOException { //将content中的内容追加写入到hdfs文件Write.txt中 final String content = "hi boy"; FSDataOutputStream out = null; try { out = fSystem.append(new Path("/user/Write.txt")); out.write(content.getBytes()); out.hsync(); } finally { // make sure the stream is closed finally. out.close(); } }
如果待追加的文件 Write.txt 已经存在,并且同时没有别的客户端正在写入内容,否则此次追加内容会失败抛出异常。
通过调用 FileSystem 实例的 open 方法获取某个读取文件的输入流。然后使用该输入流读取 HDFS 上的指定文件的内容。最后,调用 close() 关闭输入流。
private void read() throws IOException { //读取文件的内容 String strPath = "/user/Reader.txt"; Path path = new Path(strPath); FSDataInputStream in = null; BufferedReader reader = null; StringBuffer strBuffer = new StringBuffer(); try { in = fSystem.open(path); reader = new BufferedReader(new InputStreamReader(in)); String sTempOneLine; // write file while ((sTempOneLine = reader.readLine()) != null) { strBuffer.append(sTempOneLine); } LOG.info("the content is : " + strBuffer.toString()); } finally { IOUtils.closeStream(reader); IOUtils.closeStream(in); } }
private void deleteFile() throws IOException { //删除以下文件,且不可恢复 Path beDeleted = new Path("/user/fileDelete.txt"); boolean res=fSystem.delete(beDeleted, true); System.out.println(res); }
private static void testMkDir() throws Exception{ //嵌套创建目录 boolean mkdirs = fSystem.mkdirs(new Path("/user/test1/test2")); System.out.println("mkdirs success!"); }
private static void testDel() throws Exception{ //递归的删除目录 if (!fSystem.exists("/user/test1")) {//目录不存在 return false; } boolean delBl = fSystem.delete(new Path("/user/test1"), true); System.out.println(delBl); }
private static void listFilesAndBlocks()throws Exception{ //列出指定的目录下的所有文件,此处以hdfs下的根目录为例 RemoteIterator<LocatedFileStatus> listFiles = fSystem.listFiles(new Path("/"), true); while(listFiles.hasNext()){ LocatedFileStatus file = listFiles.next(); System.out.println(file.getPath()+"\t"); System.out.println(file.getPath().getName()+"\t"); System.out.println(file.getLen()+"\t"); System.out.println(file.getReplication()+"\t"); //获取blcoks相关的信息 BlockLocation[] blockLocations = file.getBlockLocations(); System.out.println(blockLocations.length+"\t"); for(BlockLocation bl : blockLocations){ String[] hosts = bl.getHosts(); System.out.print(hosts[0] + "-" + hosts[1]+"\t"); } } }
如果不想递归查看,fSystem.listFiles 中的参数写 false。
关于 HDFS API 更多的详细介绍,请参见Apache Hadoop社区文档。