jilgb 发表于 2015-11-11 10:57:35

【hadoop】reducer输出多个目录

hadoop的reducer输出多个文件

关键字: hadoop,mapreduce
有时候我们想到这样的功能: reducer能根据key(或value)值来输出多个文件,同一key(或value)处于同一个文件中。现在hadoop的0.17.x版本可以重写MultipleOutputFormat的generateFileNameForKeyValue就可以实现此功能。

比如:
Java代码 http://coderplay.javaeye.com/images/icon_copy.gif
[*]package org.apache.hadoop.mapred.lib;
[*]
[*]import java.io.IOException;
[*]
[*]import org.apache.hadoop.fs.FileSystem;
[*]import org.apache.hadoop.io.Writable;
[*]import org.apache.hadoop.io.WritableComparable;
[*]import org.apache.hadoop.mapred.JobConf;
[*]import org.apache.hadoop.mapred.RecordWriter;
[*]import org.apache.hadoop.mapred.TextOutputFormat;
[*]import org.apache.hadoop.util.Progressable;
[*]
[*]public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
[*]    extends MultipleOutputFormat<K, V> {
[*]
[*]private TextOutputFormat<K, V> theTextOutputFormat = null;
[*]
[*]@Override
[*]protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
[*]      String name, Progressable arg3) throws IOException {
[*]    if (theTextOutputFormat == null) {
[*]      theTextOutputFormat = new TextOutputFormat<K, V>();
[*]    }   
[*]    return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
[*]}   
[*]
[*]    @Override
[*]    protected String generateFileNameForKeyValue(K key, V value, String name) {
[*]      return name &#43; &quot;_&quot; &#43; value.toString();
[*]    }   
[*]   
[*]   
[*]}
viewplaincopy
[*]package org.apache.hadoop.mapred.lib;
[*]
[*]import java.io.IOException;
[*]
[*]import org.apache.hadoop.fs.FileSystem;
[*]import org.apache.hadoop.io.Writable;
[*]import org.apache.hadoop.io.WritableComparable;
[*]import org.apache.hadoop.mapred.JobConf;
[*]import org.apache.hadoop.mapred.RecordWriter;
[*]import org.apache.hadoop.mapred.TextOutputFormat;
[*]import org.apache.hadoop.util.Progressable;
[*]
[*]public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
[*]    extends MultipleOutputFormat<K, V> {
[*]
[*]private TextOutputFormat<K, V> theTextOutputFormat = null;
[*]
[*]@Override
[*]protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
[*]      String name, Progressable arg3) throws IOException {
[*]    if (theTextOutputFormat == null) {
[*]      theTextOutputFormat = new TextOutputFormat<K, V>();
[*]    }
[*]    return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
[*]}
[*]
[*]    @Override
[*]    protected String generateFileNameForKeyValue(K key, V value, String name) {
[*]      return name &#43; &quot;_&quot; &#43; value.toString();
[*]    }
[*]   
[*]   
[*]}


试一下wordcount这个例子,把WordCount.java的run函数加上一行
conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);

Java代码 http://coderplay.javaeye.com/images/icon_copy.gif
[*]public int run(String[] args) throws Exception {
[*]    JobConf conf = new JobConf(getConf(), WordCount.class);
[*]    conf.setJobName(&quot;wordcount&quot;);
[*]   
[*]    // the keys are words (strings)
[*]    conf.setOutputKeyClass(Text.class);
[*]    // the values are counts (ints)
[*]    conf.setOutputValueClass(IntWritable.class);
[*]      
[*]    conf.setMapperClass(MapClass.class);         
[*]    conf.setCombinerClass(Reduce.class);
[*]    conf.setReducerClass(Reduce.class);
[*]      
[*]    conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);
[*]      
[*]    List<String> other_args = new ArrayList<String>();
[*]    for(int i=0; i < args.length; &#43;&#43;i) {
[*]      try {
[*]      if (&quot;-m&quot;.equals(args)) {
[*]          conf.setNumMapTasks(Integer.parseInt(args[&#43;&#43;i]));   
[*]      } else if (&quot;-r&quot;.equals(args)) {
[*]          conf.setNumReduceTasks(Integer.parseInt(args[&#43;&#43;i]));   
[*]      } else {
[*]          other_args.add(args);   
[*]      }   
[*]      } catch (NumberFormatException except) {
[*]      System.out.println(&quot;ERROR: Integer expected instead of &quot; &#43; args);
[*]      return printUsage();
[*]      } catch (ArrayIndexOutOfBoundsException except) {
[*]      System.out.println(&quot;ERROR: Required parameter missing from &quot; &#43;
[*]                           args1]);
[*]      return printUsage();
[*]      }   
[*]    }   
[*]    // Make sure there are exactly 2 parameters left.
[*]    if (other_args.size() != 2) {
[*]      System.out.println(&quot;ERROR: Wrong number of parameters: &quot; &#43;
[*]                         other_args.size() &#43; &quot; instead of 2.&quot;);
[*]      return printUsage();
[*]    }   
[*]    FileInputFormat.setInputPaths(conf, other_args.get(0));
[*]    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
[*]         
[*]    JobClient.runJob(conf);   
[*]    return 0;
[*]}


则使用
bin/hadoop jar build/hadoop-*-examples.jar wordcount confwordcount_output
可输出一个目录wordcount_output
Java代码 http://coderplay.javaeye.com/images/icon_copy.gif
[*]$ls wordcount_output/   
[*]part-00000_1    part-00000_13   part-00000_16part-00000_214part-00000_28part-00000_38part-00000_5   part-00000_8   
[*]part-00000_10   part-00000_14   part-00000_17part-00000_22   part-00000_29part-00000_4   part-00000_6   part-00000_9   
[*]part-00000_102part-00000_141part-00000_19part-00000_23   part-00000_3   part-00000_42part-00000_62   
[*]part-00000_11   part-00000_143part-00000_2   part-00000_24   part-00000_31part-00000_44part-00000_63   
[*]part-00000_117part-00000_15   part-00000_20part-00000_25   part-00000_35part-00000_46part-00000_7   
[*]part-00000_12   part-00000_152part-00000_21part-00000_26   part-00000_36part-00000_47part-00000_70
         版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: 【hadoop】reducer输出多个目录