1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| package com.hadoop.examples;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* @Package
* @ClassName: PutMerge
* @Description: 读取本地目录下的文件,写入到HDFS,在写入的过程中,
* 把这三个文件合成一个文件
* @author lxy
* @date 2015年3月25日 上午9:59:38
* @version V1.0
*/
public class PutMerge {
public static void main(String[] args) throws IOException {
// 输入目录,目录下有三个txt,文章最后面会儿给出文件内容
String localPathStr = "E:\\test";
// 输出目录,HDFS路径,文章最后面会给出合并之后的文件内容
String serverPath =
"hdfs://192.168.3.57:8020/user/lxy/mergeresult/merge.txt";
//输入目录,是一个本地目录
Path inputDir = new Path(localPathStr);
//输出目录,是一个HDFS路径
Path hdfsFile = new Path(serverPath);
Configuration conf = new Configuration();
/**
* Hadoop in Action的原代码如下
* FileSystem hdfs = FileSystem.get(conf);
* 但是这样的话,在执行下面的语句是就会报异常,可能是由于版本更新的问题
* FSDataOutputStream out = hdfs.create(hdfsFile);
*/
// 根据上面的serverPath,获取到的是一个org.apache.hadoop.hdfs.DistributedFileSystem对象
FileSystem hdfs = FileSystem.get(URI.create(serverPath), conf);
FileSystem local = FileSystem.getLocal(conf);
try {
//获取输入目录下的文件以及文件夹列表
FileStatus[] inputFiles = local.listStatus(inputDir);
//在hdfs上创建一个文件
FSDataOutputStream out = hdfs.create(hdfsFile);
for (int i = 0; i < inputFiles.length; i++) {
System.out.println(inputFiles.getPath().getName());
//打开本地输入流
FSDataInputStream in = local.open(inputFiles.getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0) {
//往hdfs上的文件写数据
out.write(buffer, 0, bytesRead);
}
//释放资源
in.close();
}
//释放资源
out.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
|