|
package com.invic.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
*
* @author lucl
* @ 通过FileSystem API来实现
* FileSystem get(Configuration) 通过设置配置文件core-site.xml读取类路径来实现,默认本地文件系统
* FileSystem get(URI, Configuration) 通过URI来设定要使用的文件系统
* FileSystem get(URI, Configuration, user) 作为给定用户来访问文件系统,对安全来说至关重要
*/
public class MyHdfsOfFS {
private static String HOST = "hdfs://nnode";
private static String PORT = "8020";
private static String NAMENODE = HOST + ":" + PORT;
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
String path = NAMENODE + "/user/";
/**
* 由于这里设计的为hadoop的user目录,默认会查询hdfs的用户家目录下的文件
*/
String user = "hadoop";
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(path), conf, user);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (null == fs) {
return;
}
/**
* 递归创建目录
*/
boolean mkdirs = fs.mkdirs(new Path("invic/test/mvtech"));
if (mkdirs) {
System.out.println("Dir ‘invic/test/mvtech’ create success.");
}
/**
* 判断目录是否存在
*/
boolean exists = fs.exists(new Path("invic/test/mvtech"));
if (exists) {
System.out.println("Dir ‘invic/test/mvtech’ exists.");
}
/**
* FSDataInputStream支持随意位置访问
* 这里的lucl.txt默认查找路径为/user/Administrator/lucl.txt
因为我是windows的eclipse
* 如果我上面的get方法最后指定了user
则查询的路径为/user/get方法指定的user/lucl.txt
*/
FSDataInputStream in = fs.open(new Path("lucl.txt"));
OutputStream os = System.out;
int buffSize = 4098;
boolean close = false;
IOUtils.copyBytes(in, os, buffSize, close);
System.out.println("\r\n跳到文件开始重新读取文件。。。。。。");
in.seek(0);
IOUtils.copyBytes(in, os, buffSize, close);
IOUtils.closeStream(in);
/**
* 创建文件
*/
FSDataOutputStream create = fs.create(new Path("sample.txt"));
create.write("This is my first sample file.".getBytes());
create.flush();
create.close();
/**
* 文件拷贝
*/
fs.copyFromLocalFile(new Path("F:\\Mvtech\\ftpfile\\cg-10086.com.csv"),
new Path("cg-10086.com.csv"));
/**
* 文件追加
*/
FSDataOutputStream append = fs.append(new Path("sample.txt"));
append.writeChars("\r\n");
append.writeChars("New day, new World.");
append.writeChars("\r\n");
IOUtils.closeStream(append);
/**
* progress的使用
*/
FSDataOutputStream progress = fs.create(new Path("progress.txt"),
new Progressable() {
@Override
public void progress() {
System.out.println("write is in progress......");
}
});
// 接收键盘输入到hdfs上
Scanner sc = new Scanner(System.in);
System.out.print("Please type your enter : ");
String name = sc.nextLine();
while (!"quit".equals(name)) {
if (null == name || "".equals(name.trim())) {
continue;
}
progress.writeChars(name);
System.out.print("Please type your enter : ");
name = sc.nextLine();
}
/**
* 递归列出文件
*/
RemoteIterator it = fs.listFiles(new Path(path), true);
while (it.hasNext()) {
LocatedFileStatus loc = it.next();
System.out.println(loc.getPath().getName() + "|" + loc.getLen() + "|"
+ loc.getOwner());
}
/**
* 文件或目录元数据:文件长度、块大小、复本、修改时间、所有者及权限信息
*/
FileStatus status = fs.getFileStatus(new Path("lucl.txt"));
System.out.println(status.getPath().getName() + "|" +
status.getPath().getParent().getName() + "|" + status.getBlockSize() + "|"
+ status.getReplication() + "|" + status.getOwner());
/**
* 列出目录中文件listStatus,若参数为文件则以数组方式返回长度为1的FileStatus对象
*/
fs.listStatus(new Path(path));
fs.listStatus(new Path(path), new PathFilter() {
@Override
public boolean accept(Path tmpPath) {
String tmpName = tmpPath.getName();
if (tmpName.endsWith(".txt")) {
return true;
}
return false;
}
});
// 可以传入一组路径,会最终累计合并成一个数组返回
// fs.listStatus(Path [] files);
FileStatus [] mergeStatus = fs.listStatus(new Path[]{new Path("lucl.txt"),
new Path("progress.txt"), new Path("sample.txt")});
Path [] listPaths = FileUtil.stat2Paths(mergeStatus);
for (Path p : listPaths) {
System.out.println(p);
}
/**
* 文件模式匹配
*/
FileStatus [] patternStatus = fs.globStatus(new Path("*.txt"));
for (FileStatus stat : patternStatus) {
System.out.println(stat.getPath());
}
/**
* 删除数据
*/
boolean recursive = true;
fs.delete(new Path("demo.txt"), recursive);
fs.close();
}
}
|
|
|