设为首页 收藏本站
查看: 1385|回复: 0

Hadoop 之 Secondary Sort介绍

[复制链接]

尚未签到

发表于 2015-11-11 14:59:02 | 显示全部楼层 |阅读模式
Hadoop 之 Secondary Sort介绍
  ---------------------------
  我们知道,在reduce之前,MP框架会对收到的<K,V>对按K进行排序,而对于一个特定的K来说,它的List<V>是没有被排过序的,就是说这些V是无序的,因为它们来自不同的Map端,而且很多应用也不依赖于K所对应的list<V>的顺序,但是有一些应用就要就要依赖于相同K的V的顺序,而且还要把他们聚合在一起,下面会提出这样一个问题,是参考Hadoop The Defiinitive Guide的第八章。

1. 问题的提出

对于如下数据,我们要计算出每一年的最高温度&#20540;:

(1900,34)
(1900,32)
....
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)

  计算结果可能如下:

1901 317
1902 244
1903 289
1904 256
...
1949 111

我们一般的方法是把key设置成年份,把value设置成温度,在reduce的时候去遍历所有相同key的value&#20540;,找出最大的那个&#20540;,在Reduce返回的时候,只collect这个最大的&#20540;,这是一种办法,但是这种办法在效率上相对比较差,也不够灵活,下面我们来看看怎么使用Secondary Sort来解决这个问题





2. Secondary Sort

Secondary Sort 实际上就是一种对Value进行二次排序,然后按key的特定部分进行聚合的方法,这里用到了一个组合Key的概念,就是把K与要排序的Value组合在一起,生成一个新的Key&#20540;,上面的例子中,新的组合key为<1900,32>,也就是<年份,温度>的组合,(1900, 35°C),(1900, 34°C),这样组合以后,生成一个新的key,但是这样组合以后,它们会被切分到不同的Reduce上,所以我们这里要写一个根据新组合的key的第一个参数(年份)来进行相应的partitioner,在JobConf中可以使用setPartitionerClass来进行设置,这样可以解决相同年份的key会被聚合在同一个Reduce上,但是还没有解析在同一个Reduce上,把部分key相同的记录聚合(group)在一起,所以这里我们要设置一个group的比较器,这样就可以把相同年份的记录聚合在一起,但对于相同key(这里是指key中第一个参数相同)的排序问题,我们要使用一个KeyComparator比较器来做,就是在group中对key进行二次排序,在上面例子中就是按key中第二个参数温度来降序排序,这里要注意的是这里的输入是key,而不是value,这就是我们为什么把value组合在key一起的原因,而写这个比较方法的时候,还要注意一定要符合Group方法的原因,如果group是按key的第一个参数来得,那这里key的比较就要在第一个参数相同的情况下,才能会第二个参数(value)进行比较,我想这里解释了为什么这种排序叫Secondary
Sort的原因吧,在上面的例子中,key的比较是先比较第一个参数(年份),如果第一个参数相同,再比较第二个参数(温度),按第二个参数降序排列。





所以一般要使用Secondary Sort,在JobConf要配置这三个参数


  • setPartitionerClass                // 这个是用来设置key的切分,上面例子中是按key中的第一个参数来切分
  • setOutputValueGroupingComparator   // 这里设置group,就是按key的哪一个参数进行聚合,上面的例子中也是按第一个参数年份进行聚合
  • setOutputKeyComparatorClass        // 这个是设置key的比较器,设置聚合的key的一个二次排序方法





3. 代码分析

Example 8-9. Application to find the maximum temperature by sorting temperatures in the key

public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {

// Map任务
static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
public void map(LongWritable key, Text value,
OutputCollector<IntPair, NullWritable> output, Reporter reporter)
throws IOException {
parser.parse(value);   // 解析输入的文本
if (parser.isValidTemperature()) {
// 这里把年份与温度组合成一个key,value为空
output.collect(new IntPair(parser.getYearInt(),+ parser.getAirTemperature()), NullWritable.get());
}
}
}

// Reduce任务
static class MaxTemperatureReducer extends MapReduceBase
implements Reducer<IntPair, NullWritable, IntPair, NullWritable> {
public void reduce(IntPair key, Iterator<NullWritable> values,
OutputCollector<IntPair, NullWritable> output, Reporter reporter)
throws IOException {
// 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列
// 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值
output.collect(key, NullWritable.get());
}
}

// 切分器,这里是按年份* 127 % reduceNum来进行切分的
public static class FirstPartitioner
implements Partitioner<IntPair, NullWritable> {
@Override
public void configure(JobConf job) {}
@Override
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}

// 聚合key的一个比较器
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
// 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
// 这里是先比较年份,再比较温度,按温度降序排序
int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
if (cmp != 0) {
return cmp;
}
return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse
}
}
// 设置聚合比较器
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
// 这里是按key的第一个参数来聚合,就是年份
return IntPair.compare(ip1.getFirst(), ip2.getFirst());
}
}
@Override
public int run(String[] args) throws IOException {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
}
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setPartitionerClass(FirstPartitioner.class);     // 设置切分器
conf.setOutputKeyComparatorClass(KeyComparator.class); // 设置key的比较器
conf.setOutputValueGroupingComparator(GroupComparator.class); // 设置聚合比较器
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(IntPair.class);// 设置key的一个组合类型,如里这个类型实现了WritableComparable<T>的话,那就不要设置setOutputKeyComparatorClass了.
conf.setOutputValueClass(NullWritable.class);  // 输出的value为NULL,因为这里的实际value已经组合到了key中
JobClient.runJob(conf);
return 0;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
System.exit(exitCode);
}
}


输出结果如下:



% hadoop jar job.jar MaxTemperatureUsingSecondarySort input/ncdc/all \
> output-secondarysort
% hadoop fs -cat output-secondarysort/part-* | sort | head
1901    317
1902    244
1903    289
1904    256
1905    283
1906    294
1907    283
1908    289

要注意的是有时候用户会存储这些value,所以你要在Map的时候输出这些value,而不是上面的NULL,而输出的这些value是经过排序的。



4 参考


  • http://www.riccomini.name/Topics/DistributedComputing/Hadoop/SortByValue/
  • Hadoop The definitive Guide Chapter 8
  • https://issues.apache.org/jira/secure/attachment/12356648/485.patch
  • https://issues.apache.org/jira/browse/HADOOP-4545





版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-137999-1-1.html 上篇帖子: 基于Greenplum Hadoop分布式平台的大数据解决方案及商业应用案例剖析 下篇帖子: Virtualbox安装Ubuntu13.04并搭建Hadoop环境(单机模式+伪分布模式)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表