|
本节所用到的数据下载地址为:http://pan.baidu.com/s/1bnfELmZ
MapReduce的排序分组任务与要求
我们知道排序分组是MapReduce中Mapper端的第四步,其中分组排序都是基于Key的,我们可以通过下面这几个例子来体现出来。其中的数据和任务如下图1.1,1.2所示。
#首先按照第一列升序排列,当第一列相同时,第二列升序排列
3 3
3 2
3 1
2 2
2 1
1 1
-------------------
#结果
1 1
2 1
2 2
3 1
3 2
3 3
图 1.1 排序
#当第一列相同时,求出第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
-------------------
#结果
3 1
2 1
1 1
图 1.2 分组
一、 排序算法
1.1 MapReduce默认排序算法
使用MapReduce默认排序算法代码如下1.1所示,在代码中我将第一列作为键,第二列作为值。
1 package sort;
2
3 import java.io.IOException;
4 import java.net.URI;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.fs.FileStatus;
8 import org.apache.hadoop.fs.FileSystem;
9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
20
21 public class SortApp {
22 private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
23 private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
24 public static void main(String[] args) throws Exception {
25 Configuration conf=new Configuration();
26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
27 final Path outpath = new Path(OUT_PATH);
28 if(fileSystem.exists(outpath)){
29 fileSystem.delete(outpath,true);
30 }
31
32 final Job job = new Job(conf,SortApp.class.getSimpleName());
33
34 //1.1 指定输入文件路径
35 FileInputFormat.setInputPaths(job, INPUT_PATH);
36 job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
37
38 //1.2指定自定义的Mapper类
39 job.setMapperClass(MyMapper.class);
40 job.setMapOutputKeyClass(LongWritable.class);//指定输出的类型
41 job.setMapOutputValueClass(LongWritable.class);
42
43 //1.3 指定分区类
44 job.setPartitionerClass(HashPartitioner.class);
45 job.setNumReduceTasks(1);
46
47 //1.4 TODO 排序、分区
48
49 //1.5 TODO (可选)合并
50
51 //2.2 指定自定义的reduce类
52 job.setReducerClass(MyReducer.class);
53 job.setOutputKeyClass(LongWritable.class);//指定输出的类型
54 job.setOutputValueClass(LongWritable.class);
55
56 //2.3 指定输出到哪里
57 FileOutputFormat.setOutputPath(job, outpath);
58 job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
59 job.waitForCompletion(true);//把代码提交给JobTracker执行
60 }
61 static class MyMapper extends Mapper{
62
63 @Override
64 protected void map(
65 LongWritable key,
66 Text value,
67 Mapper.Context context)
68 throws IOException, InterruptedException {
69 final String[] splited = value.toString().split("\t");
70 final long k2 = Long.parseLong(splited[0]);
71 final long v2 = Long.parseLong(splited[1]);
72 context.write(new LongWritable(k2),new LongWritable(v2));
73 }
74 }
75 static class MyReducer extends Reducer{
76
77 @Override
78 protected void reduce(
79 LongWritable k2,
80 Iterable v2s,
81 Reducer.Context context)
82 throws IOException, InterruptedException {
83 for(LongWritable v2:v2s){
84 context.write(k2, v2);
85 }
86 }
87 }
88 }
View Code 代码 1.1
运行结果如下图1.3所示
1 1
2 2
2 1
3 3
3 2
3 1
图 1.3
从上面图中运行结果可以看出,MapReduce默认排序算法只对Key进行了排序,并没有对value进行排序,没有达到我们的要求,所以要实现我们的要求,还要我们自定义一个排序算法
1.2 自定义排序算法
从上面图中运行结果可以知道,MapReduce默认排序算法只对Key进行了排序,并没有对value进行排序,没有达到我们的要求,所以要实现我们的要求,还要我们自定义一个排序算法。在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类作为k 2 ,才能参与比较。所以在这里我们新建一个新的类型NewK2类型来封装原来的k2和v2。代码如1.2所示。
1 package sort;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6 import java.net.URI;
7
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.io.WritableComparable;
14 import org.apache.hadoop.mapreduce.Job;
15 import org.apache.hadoop.mapreduce.Mapper;
16 import org.apache.hadoop.mapreduce.Reducer;
17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
22
23 public class SortApp {
24 static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
25 static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
26 public static void main(String[] args) throws Exception{
27 final Configuration configuration = new Configuration();
28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
29 if(fileSystem.exists(new Path(OUT_PATH))){
30 fileSystem.delete(new Path(OUT_PATH), true);
31 }
32 final Job job = new Job(configuration, SortApp.class.getSimpleName());
33 //1.1 指定输入文件路径
34 FileInputFormat.setInputPaths(job, INPUT_PATH);
35 job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
36
37 //1.2指定自定义的Mapper类
38 job.setMapperClass(MyMapper.class);
39 job.setMapOutputKeyClass(NewK2.class);//指定输出的类型
40 job.setMapOutputValueClass(LongWritable.class);
41
42 //1.3 指定分区类
43 job.setPartitionerClass(HashPartitioner.class);
44 job.setNumReduceTasks(1);
45
46 //2.2 指定自定义的reduce类
47 job.setReducerClass(MyReducer.class);
48 job.setOutputKeyClass(LongWritable.class);//指定输出的类型
49 job.setOutputValueClass(LongWritable.class);
50
51 //2.3 指定输出到哪里
52 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
53 job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
54 job.waitForCompletion(true);//把代码提交给JobTracker执行
55 }
56
57
58 static class MyMapper extends Mapper{
59 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws java.io.IOException ,InterruptedException {
60 final String[] splited = value.toString().split("\t");
61 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
62 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
63 context.write(k2, v2);
64 };
65 }
66
67 static class MyReducer extends Reducer{
68 protected void reduce(NewK2 k2, java.lang.Iterable v2s, org.apache.hadoop.mapreduce.Reducer.Context context) throws java.io.IOException ,InterruptedException {
69 context.write(new LongWritable(k2.first), new LongWritable(k2.second));
70 };
71 }
72
73 /**
74 * 问:为什么实现该类?
75 * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2
76 *
77 */
78 static class NewK2 implements WritableComparable{
79 Long first;
80 Long second;
81
82 public NewK2(){}
83
84 public NewK2(long first, long second){
85 this.first = first;
86 this.second = second;
87 }
88
89
90 @Override
91 public void readFields(DataInput in) throws IOException {
92 this.first = in.readLong();
93 this.second = in.readLong();
94 }
95
96 @Override
97 public void write(DataOutput out) throws IOException {
98 out.writeLong(first);
99 out.writeLong(second);
100 }
101
102 /**
103 * 当k2进行排序时,会调用该方法.
104 * 当第一列不同时,升序;当第一列相同时,第二列升序
105 */
106 @Override
107 public int compareTo(NewK2 o) {
108 final long minus = this.first - o.first;
109 if(minus !=0){
110 return (int)minus;
111 }
112 return (int)(this.second - o.second);
113 }
114
115 @Override
116 public int hashCode() {
117 return this.first.hashCode()+this.second.hashCode();
118 }
119
120 @Override
121 public boolean equals(Object obj) {
122 if(!(obj instanceof NewK2)){
123 return false;
124 }
125 NewK2 oK2 = (NewK2)obj;
126 return (this.first==oK2.first)&&(this.second==oK2.second);
127 }
128 }
129
130 }
View Code 代码 1.2
从上面的代码中我们可以发现,我们的新类型NewK2实现了WritableComparable接口,其中该接口中有一个compareTo()方法,当对关键字进行比较会调用该方法,而我们就在该方法中实现了我们想要做的事。
运行结果如下图1.4所示。
1 1
2 1
2 2
3 1
3 2
3 3
图 1.4
二、分组算法
2.1 MapReduce默认分组
分组是在MapReduce中Mapper端的第四步,分组也是基于Key进行的,将相同key的value放到一个集合中去。还以上面排序代码为例,业务逻辑如下图2.1所示。在代码中以NewK2为关键字,每个键都不相同,所以会将数据分为六组,这样就不能实现我们的业务要求,但利用自定义类型NewK2,可以自定义排序算法的同时我们也可以自定义分组算法。
#当第一列相同时,求出第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
-------------------
#结果
3 1
2 1
1 1
图 2.1
2.2 自定义分组比较器
由于业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分,只能自定义分组比较器,代码如下2.1所示。
1 package group;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6 import java.net.URI;
7
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.RawComparator;
13 import org.apache.hadoop.io.Text;
14 import org.apache.hadoop.io.WritableComparable;
15 import org.apache.hadoop.io.WritableComparator;
16 import org.apache.hadoop.mapreduce.Job;
17 import org.apache.hadoop.mapreduce.Mapper;
18 import org.apache.hadoop.mapreduce.Reducer;
19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
20 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
22 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
23 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
24
25 public class GroupApp {
26 static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
27 static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
28 public static void main(String[] args) throws Exception{
29 final Configuration configuration = new Configuration();
30
31 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
32 if(fileSystem.exists(new Path(OUT_PATH))){
33 fileSystem.delete(new Path(OUT_PATH), true);
34 }
35 final Job job = new Job(configuration, GroupApp.class.getSimpleName());
36
37 //1.1 指定输入文件路径
38 FileInputFormat.setInputPaths(job, INPUT_PATH);
39 job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
40
41 //1.2指定自定义的Mapper类
42 job.setMapperClass(MyMapper.class);
43 job.setMapOutputKeyClass(NewK2.class);//指定输出的类型
44 job.setMapOutputValueClass(LongWritable.class);
45
46 //1.3 指定分区类
47 job.setPartitionerClass(HashPartitioner.class);
48 job.setNumReduceTasks(1);
49
50 //1.4 TODO 排序、分区
51 job.setGroupingComparatorClass(MyGroupingComparator.class);
52 //1.5 TODO (可选)合并
53
54 //2.2 指定自定义的reduce类
55 job.setReducerClass(MyReducer.class);
56 job.setOutputKeyClass(LongWritable.class);//指定输出的类型
57 job.setOutputValueClass(LongWritable.class);
58
59 //2.3 指定输出到哪里
60 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
61 job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
62 job.waitForCompletion(true);//把代码提交给JobTracker执行
63 }
64
65
66 static class MyMapper extends Mapper{
67 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws java.io.IOException ,InterruptedException {
68 final String[] splited = value.toString().split("\t");
69 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
70 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
71 context.write(k2, v2);
72 };
73 }
74
75 static class MyReducer extends Reducer{
76 protected void reduce(NewK2 k2, java.lang.Iterable v2s, org.apache.hadoop.mapreduce.Reducer.Context context) throws java.io.IOException ,InterruptedException {
77 long min = Long.MAX_VALUE;
78 for (LongWritable v2 : v2s) {
79 if(v2.get()max){
46 max = temp;
47 }
48 };
49
50 protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) throws java.io.IOException ,InterruptedException {
51 context.write(new LongWritable(max), NullWritable.get());
52 };
53 }
54
55 static class MyReducer extends Reducer{
56 long max = Long.MIN_VALUE;
57 protected void reduce(LongWritable k2, java.lang.Iterable arg1, org.apache.hadoop.mapreduce.Reducer.Context arg2) throws java.io.IOException ,InterruptedException {
58 final long temp = k2.get();
59 if(temp>max){
60 max = temp;
61 }
62 };
63
64 protected void cleanup(org.apache.hadoop.mapreduce.Reducer.Context context) throws java.io.IOException ,InterruptedException {
65 context.write(new LongWritable(max), NullWritable.get());
66 };
67 }
68 }
View Code 代码3.1
运行结果为:32767,也就是我们数据中的最大值 |
|