设为首页 收藏本站
查看: 2001|回复: 0

[经验分享] Hadoop(21)附录D.1 优化后的重分区框架

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-3-24 14:48:13 | 显示全部楼层 |阅读模式
附录D.1 优化后的重分区框架
Hadoop社区连接包需要将每个键的所有值都读取到内存中。如何才能在reduce端的连接减少内存开销呢?本文提供的优化中,只需要缓存较小的数据集,然后在连接中遍历较大数据集中的数据。这个方法中还包括针对map的输出数据的次排序,那么reducer先接收到较小的数据集,然后接收到较大的数据集。图D.1是这个过程的流程图。







图D.2是实现的类图。类图中包含两个部分,一个通用框架和一些类的实现样例。









连接框架

我们以和Hadoop社区连接包的近似的风格编写连接的代码。目标是创建可以处理任意数据集的通用重分区机制。为简洁起见,我们重点说明主要部分。

首先是OptimizedDataJoinMapperBase类。这个类的作用是辨认出较小的数据集,并生成输出键和输出值。Configure方法在mapper创建时被调用。Configure方法的作用之一是标识每一个数据集,让reducer可以区分数据的源数据集。另一个作用是辨认当前的输入数据是否是较小的数据集。




1 protected abstract Text generateInputTag(String inputFile);
2
3 protected abstract boolean isInputSmaller(String inputFile);
4
5 public void configure(JobConf job) {
6
7     this.inputFile = job.get("map.input.file");
8     this.inputTag = generateInputTag(this.inputFile);
9     
10     if(isInputSmaller(this.inputFile)) {
11         smaller = new BooleanWritable(true);
12         outputKey.setOrder(0);
13     } else {
14         smaller = new BooleanWritable(false);
15         outputKey.setOrder(1);
16     }
17 }



Map方法首先调用自定义的方法 (generateTaggedMapOutput) 来生成OutputValue对象。这个对象包含了在连接中需要使用的值(也可能包含了最终输出的值),和一个标识较大或较小数据集的布尔值。如果map方法可以调用自定义的方法 (generateGroupKey) 来得到可以在连接中使用的键,那么这个键就作为map的输出键。




1 protected abstract OptimizedTaggedMapOutput generateTaggedMapOutput(Object value);
2
3 protected abstract String generateGroupKey(Object key, OptimizedTaggedMapOutput aRecord);
4
5 public void map(Object key, Object value, OutputCollector output, Reporter reporter)
6     throws IOException {
7     
8     OptimizedTaggedMapOutput aRecord = generateTaggedMapOutput(value);
9     
10     if (aRecord == null) {
11         return;
12     }
13     
14     aRecord.setSmaller(smaller);
15     String groupKey = generateGroupKey(aRecord);
16     
17     if (groupKey == null) {
18         return;
19     }
20     
21     outputKey.setKey(groupKey);
22     output.collect(outputKey, aRecord);
23 }



图D.3 说明了map输出的组合键(composite 可以)和组合值。次排序将会根据连接键(join key)进行分区,并用整个组合键来进行排序。组合键包括一个标识源数据集(较大或较小)的整形值,因此可以根据这个整形值来保证较小源数据集的值先于较大源数据的值被reduce接收。







下一步是深入reduce。此前已经可以保证较小源数据集的值将会先于较大源数据集的值被接收。这里就可以将所有的较小源数据集的值放到缓存中。在开始接收较大源数据集的值的时候,就开始和缓存中的值做连接操作。




1 public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter)
2     throws IOException {
3
4     CompositeKey k = (CompositeKey) key;
5     List<OptimizedTaggedMapOutput> smaller = new ArrayList<OptimizedTaggedMapOutput>();
6     
7     while (values.hasNext()) {
8         Object value = values.next();
9         OptimizedTaggedMapOutput cloned =((OptimizedTaggedMapOutput) value).clone(job);
10         
11         if (cloned.isSmaller().get()) {
12             smaller.add(cloned);
13         } else {
14             joinAndCollect(k, smaller, cloned, output, reporter);
15         }
16     }
17 }



方法joinAndCollect包含了两个数据集的值,并输出它们。




1 protected abstract OptimizedTaggedMapOutput combine(
2                         String key,
3                         OptimizedTaggedMapOutput value1,
4                         OptimizedTaggedMapOutput value2);
5                        
6 private void joinAndCollect(CompositeKey key,
7                             List<OptimizedTaggedMapOutput> smaller,
8                             OptimizedTaggedMapOutput value,
9                             OutputCollector output,
10                             Reporter reporter)
11     throws IOException {
12     
13     if (smaller.size() < 1) {
14         OptimizedTaggedMapOutput combined = combine(key.getKey(), null, value);
15         collect(key, combined, output, reporter);
16     } else {
17         for (OptimizedTaggedMapOutput small : smaller) {
18             OptimizedTaggedMapOutput combined = combine(key.getKey(), small, value);
19             collect(key, combined, output, reporter);
20         }
21     }
22 }



这些就是这个框架的主要内容。第4章介绍能如何使用这个框架。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-16117-1-1.html 上篇帖子: Hadoop(20)附录A.10 压缩格式LZOP编译安装配置 下篇帖子: hadoop 异常处理实例(一)hadoop内存配置项
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表