设为首页 收藏本站
查看: 1011|回复: 0

[经验分享] Hadoop日记Day18---MapReduce排序分组

[复制链接]

尚未签到

发表于 2015-7-14 08:46:25 | 显示全部楼层 |阅读模式
  本节所用到的数据下载地址为:http://pan.baidu.com/s/1bnfELmZ

MapReduce的排序分组任务与要求
  我们知道排序分组是MapReduce中Mapper端的第四步,其中分组排序都是基于Key的,我们可以通过下面这几个例子来体现出来。其中的数据和任务如下图1.1,1.2所示。



#首先按照第一列升序排列,当第一列相同时,第二列升序排列
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#结果
1    1
2    1
2    2
3    1
3    2
3    3
  图 1.1 排序



#当第一列相同时,求出第二列的最小值
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#结果
3    1
2    1
1    1
  图 1.2 分组

一、 排序算法

1.1 MapReduce默认排序算法
  使用MapReduce默认排序算法代码如下1.1所示,在代码中我将第一列作为键,第二列作为值。


DSC0000.gif DSC0001.gif


1 package sort;
2
3 import java.io.IOException;
4 import java.net.URI;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.fs.FileStatus;
8 import org.apache.hadoop.fs.FileSystem;
9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
20
21 public class SortApp {
22     private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
23     private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
24     public static void main(String[] args) throws Exception {
25         Configuration conf=new Configuration();
26         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
27         final Path outpath = new Path(OUT_PATH);
28         if(fileSystem.exists(outpath)){
29             fileSystem.delete(outpath,true);
30         }
31         
32         final Job job = new Job(conf,SortApp.class.getSimpleName());
33         
34         //1.1 指定输入文件路径
35         FileInputFormat.setInputPaths(job, INPUT_PATH);        
36         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
37                 
38         //1.2指定自定义的Mapper类
39         job.setMapperClass(MyMapper.class);        
40         job.setMapOutputKeyClass(LongWritable.class);//指定输出的类型
41         job.setMapOutputValueClass(LongWritable.class);
42                 
43         //1.3 指定分区类
44         job.setPartitionerClass(HashPartitioner.class);
45         job.setNumReduceTasks(1);
46                 
47         //1.4 TODO 排序、分区
48                 
49         //1.5  TODO (可选)合并
50                 
51         //2.2 指定自定义的reduce类
52         job.setReducerClass(MyReducer.class);        
53         job.setOutputKeyClass(LongWritable.class);//指定输出的类型
54         job.setOutputValueClass(LongWritable.class);
55                 
56         //2.3 指定输出到哪里
57         FileOutputFormat.setOutputPath(job, outpath);        
58         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类                        
59         job.waitForCompletion(true);//把代码提交给JobTracker执行        
60     }
61     static class MyMapper extends Mapper{
62
63         @Override
64         protected void map(
65                 LongWritable key,
66                 Text value,
67                 Mapper.Context context)
68                 throws IOException, InterruptedException {
69             final String[] splited = value.toString().split("\t");
70             final long k2 = Long.parseLong(splited[0]);
71             final long v2 = Long.parseLong(splited[1]);
72             context.write(new LongWritable(k2),new LongWritable(v2));
73         }   
74     }
75     static class MyReducer extends Reducer{
76
77         @Override
78         protected void reduce(
79                 LongWritable k2,
80                 Iterable v2s,
81                 Reducer.Context context)
82                 throws IOException, InterruptedException {
83             for(LongWritable v2:v2s){
84                 context.write(k2, v2);
85             }            
86         }   
87     }
88 }
View Code  代码 1.1
  运行结果如下图1.3所示



1    1
2    2
2    1
3    3
3    2
3    1
  图 1.3
  从上面图中运行结果可以看出,MapReduce默认排序算法只对Key进行了排序,并没有对value进行排序,没有达到我们的要求,所以要实现我们的要求,还要我们自定义一个排序算法

1.2 自定义排序算法
  从上面图中运行结果可以知道,MapReduce默认排序算法只对Key进行了排序,并没有对value进行排序,没有达到我们的要求,所以要实现我们的要求,还要我们自定义一个排序算法。在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类作为k 2 ,才能参与比较。所以在这里我们新建一个新的类型NewK2类型来封装原来的k2和v2。代码如1.2所示。





  1 package sort;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.io.WritableComparable;
14 import org.apache.hadoop.mapreduce.Job;
15 import org.apache.hadoop.mapreduce.Mapper;
16 import org.apache.hadoop.mapreduce.Reducer;
17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
22
23 public class SortApp {
24     static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
25     static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
26     public static void main(String[] args) throws Exception{
27         final Configuration configuration = new Configuration();
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
29         if(fileSystem.exists(new Path(OUT_PATH))){
30             fileSystem.delete(new Path(OUT_PATH), true);
31         }
32         final Job job = new Job(configuration, SortApp.class.getSimpleName());
33         //1.1 指定输入文件路径
34         FileInputFormat.setInputPaths(job, INPUT_PATH);        
35         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
36         
37         //1.2指定自定义的Mapper类
38         job.setMapperClass(MyMapper.class);        
39         job.setMapOutputKeyClass(NewK2.class);//指定输出的类型
40         job.setMapOutputValueClass(LongWritable.class);
41         
42         //1.3 指定分区类
43         job.setPartitionerClass(HashPartitioner.class);
44         job.setNumReduceTasks(1);
45         
46         //2.2 指定自定义的reduce类
47         job.setReducerClass(MyReducer.class);        
48         job.setOutputKeyClass(LongWritable.class);//指定输出的类型
49         job.setOutputValueClass(LongWritable.class);
50         
51         //2.3 指定输出到哪里
52         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));        
53         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
54         job.waitForCompletion(true);//把代码提交给JobTracker执行
55     }
56
57     
58     static class MyMapper extends Mapper{
59         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws java.io.IOException ,InterruptedException {
60             final String[] splited = value.toString().split("\t");
61             final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
62             final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
63             context.write(k2, v2);
64         };
65     }
66     
67     static class MyReducer extends Reducer{
68         protected void reduce(NewK2 k2, java.lang.Iterable v2s, org.apache.hadoop.mapreduce.Reducer.Context context) throws java.io.IOException ,InterruptedException {
69             context.write(new LongWritable(k2.first), new LongWritable(k2.second));
70         };
71     }
72     
73     /**
74      * 问:为什么实现该类?
75      * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2
76      *
77      */
78     static class  NewK2 implements WritableComparable{
79         Long first;
80         Long second;
81         
82         public NewK2(){}
83         
84         public NewK2(long first, long second){
85             this.first = first;
86             this.second = second;
87         }
88         
89         
90         @Override
91         public void readFields(DataInput in) throws IOException {
92             this.first = in.readLong();
93             this.second = in.readLong();
94         }
95
96         @Override
97         public void write(DataOutput out) throws IOException {
98             out.writeLong(first);
99             out.writeLong(second);
100         }
101
102         /**
103          * 当k2进行排序时,会调用该方法.
104          * 当第一列不同时,升序;当第一列相同时,第二列升序
105          */
106         @Override
107         public int compareTo(NewK2 o) {
108             final long minus = this.first - o.first;
109             if(minus !=0){
110                 return (int)minus;
111             }
112             return (int)(this.second - o.second);
113         }
114         
115         @Override
116         public int hashCode() {
117             return this.first.hashCode()+this.second.hashCode();
118         }
119         
120         @Override
121         public boolean equals(Object obj) {
122             if(!(obj instanceof NewK2)){
123                 return false;
124             }
125             NewK2 oK2 = (NewK2)obj;
126             return (this.first==oK2.first)&&(this.second==oK2.second);
127         }
128     }
129     
130 }
View Code  代码 1.2
  从上面的代码中我们可以发现,我们的新类型NewK2实现了WritableComparable接口,其中该接口中有一个compareTo()方法,当对关键字进行比较会调用该方法,而我们就在该方法中实现了我们想要做的事。
  运行结果如下图1.4所示。



1    1
2    1
2    2
3    1
3    2
3    3
  图 1.4

二、分组算法

2.1 MapReduce默认分组
  分组是在MapReduce中Mapper端的第四步,分组也是基于Key进行的,将相同key的value放到一个集合中去。还以上面排序代码为例,业务逻辑如下图2.1所示。在代码中以NewK2为关键字,每个键都不相同,所以会将数据分为六组,这样就不能实现我们的业务要求,但利用自定义类型NewK2,可以自定义排序算法的同时我们也可以自定义分组算法。



#当第一列相同时,求出第二列的最小值
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#结果
3    1
2    1
1    1
  图 2.1

2.2 自定义分组比较器
  由于业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分,只能自定义分组比较器,代码如下2.1所示。





  1 package group;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.RawComparator;
13 import org.apache.hadoop.io.Text;
14 import org.apache.hadoop.io.WritableComparable;
15 import org.apache.hadoop.io.WritableComparator;
16 import org.apache.hadoop.mapreduce.Job;
17 import org.apache.hadoop.mapreduce.Mapper;
18 import org.apache.hadoop.mapreduce.Reducer;
19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
20 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
22 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
23 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
24
25 public class GroupApp {
26     static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
27     static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
28     public static void main(String[] args) throws Exception{
29         final Configuration configuration = new Configuration();
30         
31         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
32         if(fileSystem.exists(new Path(OUT_PATH))){
33             fileSystem.delete(new Path(OUT_PATH), true);
34         }        
35         final Job job = new Job(configuration, GroupApp.class.getSimpleName());
36         
37         //1.1 指定输入文件路径
38         FileInputFormat.setInputPaths(job, INPUT_PATH);        
39         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
40         
41         //1.2指定自定义的Mapper类
42         job.setMapperClass(MyMapper.class);        
43         job.setMapOutputKeyClass(NewK2.class);//指定输出的类型
44         job.setMapOutputValueClass(LongWritable.class);
45         
46         //1.3 指定分区类
47         job.setPartitionerClass(HashPartitioner.class);
48         job.setNumReduceTasks(1);
49         
50         //1.4 TODO 排序、分区
51         job.setGroupingComparatorClass(MyGroupingComparator.class);
52         //1.5  TODO (可选)合并
53         
54         //2.2 指定自定义的reduce类
55         job.setReducerClass(MyReducer.class);        
56         job.setOutputKeyClass(LongWritable.class);//指定输出的类型
57         job.setOutputValueClass(LongWritable.class);
58         
59         //2.3 指定输出到哪里
60         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));        
61         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类        
62         job.waitForCompletion(true);//把代码提交给JobTracker执行
63     }
64
65     
66     static class MyMapper extends Mapper{
67         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws java.io.IOException ,InterruptedException {
68             final String[] splited = value.toString().split("\t");
69             final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
70             final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
71             context.write(k2, v2);
72         };
73     }
74     
75     static class MyReducer extends Reducer{
76         protected void reduce(NewK2 k2, java.lang.Iterable v2s, org.apache.hadoop.mapreduce.Reducer.Context context) throws java.io.IOException ,InterruptedException {
77             long min = Long.MAX_VALUE;
78             for (LongWritable v2 : v2s) {
79                 if(v2.get()max){
46                 max = temp;
47             }
48         };
49         
50         protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) throws java.io.IOException ,InterruptedException {
51             context.write(new LongWritable(max), NullWritable.get());
52         };
53     }
54
55     static class MyReducer extends Reducer{
56         long max = Long.MIN_VALUE;
57         protected void reduce(LongWritable k2, java.lang.Iterable arg1, org.apache.hadoop.mapreduce.Reducer.Context arg2) throws java.io.IOException ,InterruptedException {
58             final long temp = k2.get();
59             if(temp>max){
60                 max = temp;
61             }
62         };
63         
64         protected void cleanup(org.apache.hadoop.mapreduce.Reducer.Context context) throws java.io.IOException ,InterruptedException {
65             context.write(new LongWritable(max), NullWritable.get());
66         };
67     }        
68 }
View Code  代码3.1
  运行结果为:32767,也就是我们数据中的最大值

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86445-1-1.html 上篇帖子: Hadoop验证文件系统的健康 下篇帖子: Hadoop的运行痕迹
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表