|
1 package zhouls.bigdata.myMapReduce.Email;
2
3 import java.io.IOException;
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.conf.Configured;
6 import org.apache.hadoop.fs.FileSystem;
7 import org.apache.hadoop.fs.Path;
8 import org.apache.hadoop.io.IntWritable;
9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
17 import org.apache.hadoop.util.Tool;
18 import org.apache.hadoop.util.ToolRunner;
19
20 //通过MultipleOutputs写到多个文件:参考博客http://www.cnblogs.com/codeOfLife/p/5452902.html
21
22 // MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。
23 // 这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,
24 // 其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始)。块号保证从不同块(mapper 或 reducer)输出在相同名字情况下不会冲突。
25
26
27
28 public>
29 public static>
30 private final static IntWritable one = new IntWritable(1);
31
32 @Override
33 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
34 context.write(value, one);
35 }
36 }
37
38
39 public static>
40 private IntWritable result = new IntWritable();
41 private MultipleOutputs< Text, IntWritable> multipleOutputs;
42
43 @Override
44 protected void setup(Context context) throws IOException ,InterruptedException{
45 multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);
46 }
47
48 protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException {
49 int begin = Key.toString().indexOf("@");//indexOf方法返回一个整数值,指出 String 对象内子字符串的开始位置。
50 int end = Key.toString().indexOf(".");//indexOf方法返回一个整数值,指出 String 对象内子字符串的开始位置。只不过我们自己写出个end变量而已
51 // Key.toString().indexOf(ch)
52 // Key.toString().indexOf(str)
53 // Key.toString().indexOf(ch, fromIndex)
54 // Key.toString().indexOf(str, fromIndex)
55 // Key.toString().intern()
56
57 // Java中字符串中子串的查找共有四种方法,如下:
58 // 1、int indexOf(String str) :返回第一次出现的指定子字符串在此字符串中的索引。
59 // 2、int indexOf(String str, int startIndex):从指定的索引处开始,返回第一次出现的指定子字符串在此字符串中的索引。
60 // 3、int lastIndexOf(String str) :返回在此字符串中最右边出现的指定子字符串的索引。
61 // 4、int lastIndexOf(String str, int startIndex) :从指定的索引处开始向后搜索,返回在此字符串中最后一次出现的指定子字符串的索引。
62
63
64 if(begin>=end){
65 return;
66 }
67
68 //获取邮箱类别,比如 qq
69 String name = Key.toString().substring(begin+1, end);
70 // String.subString(start,end)截取的字符串包括起点所在的字符串,不包括终点所在的字符串
71
72 int sum = 0;
73
74 for (IntWritable value : Values) {
75 sum += value.get();
76 }
77
78 result.set(sum);
79 multipleOutputs.write(Key, result, name);
80 //这里,我们用到的是multipleOutputs.write(Text key, IntWritable value, String baseOutputPath);
81
82 // multipleOutputs.write默认有3种构造方法:
83 // multipleOutputs.write(String namedOutput, K key, V value);
84 // multipleOutputs.write(Text key, IntWritable value, String baseOutputPath);
85 // multipleOutputs.write(String namedOutput, K key, V value,String baseOutputPath);
86
87 // MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。
88 // 这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。
89 // 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,
90 // 其中 name 是由程序设定的任意名字,
91 // nnnnn 是一个指明块号的整数(从 0 开始)。
92 // 块号保证从不同块(mapper 或 reducer)写的输出在相同名字情况下不会冲突。
93
94 }
95
96 @Override
97 protected void cleanup(Context context) throws IOException ,InterruptedException{
98 multipleOutputs.close();
99 }
100
101 }
102
103 public int run(String[] args) throws Exception {
104 Configuration conf = new Configuration();// 读取配置文件
105
106 Path mypath = new Path(args[1]);
107 FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
108 if (hdfs.isDirectory(mypath)) {
109 hdfs.delete(mypath, true);
110 }
111 Job job = Job.getInstance();// 新建一个任务
112 job.setJarByClass(Email.class);// 主类
113
114 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
115 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
116
117 job.setMapperClass(MailMapper.class);// Mapper
118 job.setReducerClass(MailReducer.class);// Reducer
119
120 job.setOutputKeyClass(Text.class);// key输出类型
121 job.setOutputValueClass(IntWritable.class);// value输出类型
122
123 job.waitForCompletion(true);
124 return 0;
125 }
126
127 public static void main(String[] args) throws Exception {
128 String[] args0 = {
129 "hdfs://HadoopMaster:9000/inputData/multipleOutputFormats/mail.txt",
130 "hdfs://HadoopMaster:9000/outData/MultipleOutputFormats/" };
131 int ec = ToolRunner.run(new Configuration(), new Email(), args0);
132 System.exit(ec);
133 }
134 } |
|
|