设为首页 收藏本站
查看: 914|回复: 0

[经验分享] Hadoop 使用Combiner提高Map/Reduce程序效率

[复制链接]

尚未签到

发表于 2016-12-11 06:34:11 | 显示全部楼层 |阅读模式
众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。
在上述过程中,我们看到至少两个性能瓶颈:


  • 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
  • 使用专利中的国家一项来阐述数据倾斜这个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。

  Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网
络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。
  如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:
Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。
由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程。
代码如下:

[java]viewplaincopy



  • packagecom;

  • importjava.io.IOException;

  • importorg.apache.hadoop.conf.Configuration;
  • importorg.apache.hadoop.conf.Configured;
  • importorg.apache.hadoop.fs.Path;
  • importorg.apache.hadoop.io.DoubleWritable;
  • importorg.apache.hadoop.io.LongWritable;
  • importorg.apache.hadoop.io.Text;
  • importorg.apache.hadoop.mapreduce.Job;
  • importorg.apache.hadoop.mapreduce.Mapper;
  • importorg.apache.hadoop.mapreduce.Reducer;
  • importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  • importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  • importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  • importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  • importorg.apache.hadoop.util.Tool;
  • importorg.apache.hadoop.util.ToolRunner;

  • publicclassAveragingWithCombinerextendsConfiguredimplementsTool{

  • publicstaticclassMapClassextendsMapper<LongWritable,Text,Text,Text>{

  • staticenumClaimsCounters{MISSING,QUOTED};
  • //MapMethod
  • publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
  • Stringfields[]=value.toString().split(",",-20);
  • Stringcountry=fields[4];
  • StringnumClaims=fields[8];

  • if(numClaims.length()>0&&!numClaims.startsWith("\"")){
  • context.write(newText(country),newText(numClaims+",1"));
  • }
  • }
  • }

  • publicstaticclassReduceextendsReducer<Text,Text,Text,DoubleWritable>{

  • //ReduceMethod
  • publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
  • doublesum=0;
  • intcount=0;
  • for(Textvalue:values){
  • Stringfields[]=value.toString().split(",");
  • sum+=Double.parseDouble(fields[0]);
  • count+=Integer.parseInt(fields[1]);
  • }
  • context.write(key,newDoubleWritable(sum/count));
  • }
  • }

  • publicstaticclassCombineextendsReducer<Text,Text,Text,Text>{

  • //ReduceMethod
  • publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
  • doublesum=0;
  • intcount=0;
  • for(Textvalue:values){
  • Stringfields[]=value.toString().split(",");
  • sum+=Double.parseDouble(fields[0]);
  • count+=Integer.parseInt(fields[1]);
  • }
  • context.write(key,newText(sum+","+count));
  • }
  • }

  • //runMethod
  • publicintrun(String[]args)throwsException{
  • //CreateandRuntheJob
  • Jobjob=newJob();
  • job.setJarByClass(AveragingWithCombiner.class);

  • FileInputFormat.addInputPath(job,newPath(args[0]));
  • FileOutputFormat.setOutputPath(job,newPath(args[1]));

  • job.setJobName("AveragingWithCombiner");
  • job.setMapperClass(MapClass.class);
  • job.setCombinerClass(Combine.class);
  • job.setReducerClass(Reduce.class);
  • job.setInputFormatClass(TextInputFormat.class);
  • job.setOutputFormatClass(TextOutputFormat.class);

  • job.setOutputKeyClass(Text.class);
  • job.setOutputValueClass(Text.class);

  • System.exit(job.waitForCompletion(true)?0:1);
  • return0;
  • }

  • publicstaticvoidmain(String[]args)throwsException{
  • intres=ToolRunner.run(newConfiguration(),newAveragingWithCombiner(),args);
  • System.exit(res);
  • }

  • }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312408-1-1.html 上篇帖子: Hadoop家族安装系列(2)——安装Mahout0.9框架 下篇帖子: hadoop,hbase,zookeeper错误日志及部分解决办法(1)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表