设为首页 收藏本站
查看: 620|回复: 0

[经验分享] 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”

[复制链接]

尚未签到

发表于 2016-12-11 08:23:06 | 显示全部楼层 |阅读模式

原文链接:http://my.oschina.net/leejun2005/blog/131744


之前有童鞋问到了这样一个问题:为什么我在 reduce 阶段遍历了一次Iterable 之后,再次遍历的时候,数据都没了呢?可能有童鞋想当然的回答:Iterable
只能单向遍历一次,就这样简单的原因。。。事实果真如此吗?


还是用代码说话:



package com.test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class T {
public static void main(String[] args) {
// 只要实现了Iterable接口的对象都可以使用for-each循环。
// Iterable接口只由iterator方法构成,
// iterator()方法是java.lang.Iterable接口,被Collection继承。
/*public interface Iterable<T> {
Iterator<T> iterator();
}*/
Iterable<String> iter = new Iterable<String>() {
public Iterator<String> iterator() {
List<String> l = new ArrayList<String>();
l.add("aa");
l.add("bb");
l.add("cc");
return l.iterator();
}
};
for(int count : new int[] {1, 2}){
for (String item : iter) {
System.out.println(item);
}
System.out.println("---------->> " + count + " END.");
}
}
}









结果当然是很正常的完整无误的打印了两遍Iterable
的值。那究竟是什么原因导致了 reduce 阶段的
Iterable
只能被遍历一次呢?


我们先看一段测试代码:


测试数据:


a 3
a 4
b 50
b 60
a 70
b 8
a 9

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestIterable {
public static class M1 extends Mapper<Object, Text, Text, Text> {
private Text oKey = new Text();
private Text oVal = new Text();
String[] lineArr;
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
lineArr = value.toString().split(" ");
oKey.set(lineArr[0]);
oVal.set(lineArr[1]);
context.write(oKey, oVal);
}
}
public static class R1 extends Reducer<Text, Text, Text, Text> {
List<String> valList = new ArrayList<String>();
List<Text> textList = new ArrayList<Text>();
String strAdd;
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
valList.clear();
textList.clear();
strAdd = "";
for (Text val : values) {
valList.add(val.toString());
textList.add(val);
}
// 坑之 1 :为神马输出的全是最后一个值?why?
for(Text text : textList){
strAdd += text.toString() + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println(".......................");
// 我这样干呢?对了吗?
strAdd = "";
for(String val : valList){
strAdd += val + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println("----------------------");
// 坑之 2 :第二次遍历的时候为什么得到的都是空?why?
valList.clear();
strAdd = "";
for (Text val : values) {
valList.add(val.toString());
}
for(String val : valList){
strAdd += val + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>");
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.queue.name", "regular");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
System.out.println("------------------------");
Job job = new Job(conf, "TestIterable");
job.setJarByClass(TestIterable.class);
job.setMapperClass(M1.class);
job.setReducerClass(R1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入输出路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}








  在 Eclipse 控制台中的结果如下:
  

a9, 9, 9, 9,
.......................
a3, 4, 70, 9,
----------------------
a
>>>>>>>>>>>>>>>>>>>>>>
b8, 8, 8,
.......................
b50, 60, 8,
----------------------
b
>>>>>>>>>>>>>>>>>>>>>>







关于第 1 个坑:对象重用(objects
reuse


reduce方法的javadoc中已经说明了会出现的问题:


The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse
the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of.


   也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text
store = new Text(value) 或者 String a = value.toString()
),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。


看到这里,我想你会恍然大悟:这不是刚毕业找工作,面试官常问的问题:String 是不可变对象但为什么能相加呢?为什么字符串相加不提倡用 String,而用 StringBuilder ?如果你还不清楚这个问题怎么回答,建议你看看这篇深入理解
String, StringBuffer 与 StringBuilder 的区别
http://my.oschina.net/leejun2005/blog/102377




关于第 2 个坑:http://stackoverflow.com/questions/6111248/iterate-twice-on-values


The Iterator you receive from that Iterable's iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren't really backed by a Collection, so it's
nontrivial to allow multiple iterations.




最后想说明的是:hadoop 框架的作者们真的是考虑很周全,在 hadoop 框架中,不仅有对象重用,还有 JVM 重用等,节约一切可以节约的资源,提高一切可以提高的性能。因为在这种海量数据处理的场景下,性能优化是非常重要的,你可能处理100条数据体现不出性能差别,但是你面对的是千亿、万亿级别的数据呢?


PS:


我的代码是在 Eclipse 中远程调试的,所以 reduce 是没有写 hdfs 的,直接在 eclipse 终端上可以看到结果,很方便,关于怎么在 windows 上远程调试 hadoop,请参考这里实战
windows7 下 eclipse 远程调试 linux hadoop
http://my.oschina.net/leejun2005/blog/122775


REF:


hadoop中迭代器的对象重用问题


http://paddy-w.iyunv.com/blog/1514595


关于 hadoop 中 JVM 重用和对象重用的介绍


http://wikidoop.com/wiki/Hadoop/MapReduce/Reducer

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312535-1-1.html 上篇帖子: Sqoop 1.99.3 with hadoop-2.3.0 使用1 下篇帖子: (转)【Hadoop代码笔记】通过JobClient对Jobtracker的调用详细了解Hadoop RPC
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表