设为首页 收藏本站
查看: 818|回复: 0

【hadoop】reducer输出多个目录

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-11 10:57:35 | 显示全部楼层 |阅读模式
hadoop的reducer输出多个文件

关键字: hadoop,mapreduce
有时候我们想到这样的功能: reducer能根据key(或value)值来输出多个文件,同一key(或value)处于同一个文件中。现在hadoop的0.17.x版本可以重写MultipleOutputFormat的generateFileNameForKeyValue就可以实现此功能。

比如:
Java代码

  • package org.apache.hadoop.mapred.lib;  
  •   
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.fs.FileSystem;  
  • import org.apache.hadoop.io.Writable;  
  • import org.apache.hadoop.io.WritableComparable;  
  • import org.apache.hadoop.mapred.JobConf;  
  • import org.apache.hadoop.mapred.RecordWriter;  
  • import org.apache.hadoop.mapred.TextOutputFormat;  
  • import org.apache.hadoop.util.Progressable;  
  •   
  • public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>  
  •     extends MultipleOutputFormat<K, V> {  
  •   
  •   private TextOutputFormat<K, V> theTextOutputFormat = null;  
  •   
  •   @Override  
  •   protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,  
  •       String name, Progressable arg3) throws IOException {  
  •     if (theTextOutputFormat == null) {  
  •       theTextOutputFormat = new TextOutputFormat<K, V>();  
  •     }   
  •     return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);  
  •   }   
  •   
  •     @Override  
  •     protected String generateFileNameForKeyValue(K key, V value, String name) {  
  •         return name &#43; &quot;_&quot; &#43; value.toString();  
  •     }   
  •      
  •      
  • }  
[java] viewplaincopy

  • package org.apache.hadoop.mapred.lib;  
  •   
  • import java.io.IOException;  
  •   
  • import org.apache.hadoop.fs.FileSystem;  
  • import org.apache.hadoop.io.Writable;  
  • import org.apache.hadoop.io.WritableComparable;  
  • import org.apache.hadoop.mapred.JobConf;  
  • import org.apache.hadoop.mapred.RecordWriter;  
  • import org.apache.hadoop.mapred.TextOutputFormat;  
  • import org.apache.hadoop.util.Progressable;  
  •   
  • public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>  
  •     extends MultipleOutputFormat<K, V> {  
  •   
  •   private TextOutputFormat<K, V> theTextOutputFormat = null;  
  •   
  •   @Override  
  •   protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,  
  •       String name, Progressable arg3) throws IOException {  
  •     if (theTextOutputFormat == null) {  
  •       theTextOutputFormat = new TextOutputFormat<K, V>();  
  •     }  
  •     return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);  
  •   }  
  •   
  •     @Override  
  •     protected String generateFileNameForKeyValue(K key, V value, String name) {  
  •         return name &#43; &quot;_&quot; &#43; value.toString();  
  •     }  
  •    
  •    
  • }  


试一下wordcount这个例子,把WordCount.java的run函数加上一行
conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);

Java代码

  • public int run(String[] args) throws Exception {  
  •     JobConf conf = new JobConf(getConf(), WordCount.class);  
  •     conf.setJobName(&quot;wordcount&quot;);  
  •    
  •     // the keys are words (strings)   
  •     conf.setOutputKeyClass(Text.class);  
  •     // the values are counts (ints)   
  •     conf.setOutputValueClass(IntWritable.class);  
  •       
  •     conf.setMapperClass(MapClass.class);         
  •     conf.setCombinerClass(Reduce.class);  
  •     conf.setReducerClass(Reduce.class);  
  •       
  •     conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);  
  •       
  •     List<String> other_args = new ArrayList<String>();  
  •     for(int i=0; i < args.length; &#43;&#43;i) {  
  •       try {  
  •         if (&quot;-m&quot;.equals(args)) {  
  •           conf.setNumMapTasks(Integer.parseInt(args[&#43;&#43;i]));   
  •         } else if (&quot;-r&quot;.equals(args)) {  
  •           conf.setNumReduceTasks(Integer.parseInt(args[&#43;&#43;i]));   
  •         } else {  
  •           other_args.add(args);   
  •         }   
  •       } catch (NumberFormatException except) {  
  •         System.out.println(&quot;ERROR: Integer expected instead of &quot; &#43; args);  
  •         return printUsage();  
  •       } catch (ArrayIndexOutOfBoundsException except) {  
  •         System.out.println(&quot;ERROR: Required parameter missing from &quot; &#43;  
  •                            args[i-1]);  
  •         return printUsage();  
  •       }   
  •     }   
  •     // Make sure there are exactly 2 parameters left.   
  •     if (other_args.size() != 2) {  
  •       System.out.println(&quot;ERROR: Wrong number of parameters: &quot; &#43;  
  •                          other_args.size() &#43; &quot; instead of 2.&quot;);  
  •       return printUsage();  
  •     }   
  •     FileInputFormat.setInputPaths(conf, other_args.get(0));  
  •     FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));  
  •            
  •     JobClient.runJob(conf);   
  •     return 0;  
  •   }  


则使用
bin/hadoop jar build/hadoop-*-examples.jar wordcount conf  wordcount_output
可输出一个目录wordcount_output
Java代码

  • $ls wordcount_output/   
  • part-00000_1    part-00000_13   part-00000_16  part-00000_214  part-00000_28  part-00000_38  part-00000_5   part-00000_8   
  • part-00000_10   part-00000_14   part-00000_17  part-00000_22   part-00000_29  part-00000_4   part-00000_6   part-00000_9   
  • part-00000_102  part-00000_141  part-00000_19  part-00000_23   part-00000_3   part-00000_42  part-00000_62   
  • part-00000_11   part-00000_143  part-00000_2   part-00000_24   part-00000_31  part-00000_44  part-00000_63   
  • part-00000_117  part-00000_15   part-00000_20  part-00000_25   part-00000_35  part-00000_46  part-00000_7   
  • part-00000_12   part-00000_152  part-00000_21  part-00000_26   part-00000_36  part-00000_47  part-00000_70  
         版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-137822-1-1.html 上篇帖子: hadoop archive 下篇帖子: hadoop的datanode异常结束
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表