【hadoop】reducer输出多个目录
hadoop的reducer输出多个文件关键字: hadoop,mapreduce
有时候我们想到这样的功能: reducer能根据key(或value)值来输出多个文件,同一key(或value)处于同一个文件中。现在hadoop的0.17.x版本可以重写MultipleOutputFormat的generateFileNameForKeyValue就可以实现此功能。
比如:
Java代码 http://coderplay.javaeye.com/images/icon_copy.gif
[*]package org.apache.hadoop.mapred.lib;
[*]
[*]import java.io.IOException;
[*]
[*]import org.apache.hadoop.fs.FileSystem;
[*]import org.apache.hadoop.io.Writable;
[*]import org.apache.hadoop.io.WritableComparable;
[*]import org.apache.hadoop.mapred.JobConf;
[*]import org.apache.hadoop.mapred.RecordWriter;
[*]import org.apache.hadoop.mapred.TextOutputFormat;
[*]import org.apache.hadoop.util.Progressable;
[*]
[*]public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
[*] extends MultipleOutputFormat<K, V> {
[*]
[*]private TextOutputFormat<K, V> theTextOutputFormat = null;
[*]
[*]@Override
[*]protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
[*] String name, Progressable arg3) throws IOException {
[*] if (theTextOutputFormat == null) {
[*] theTextOutputFormat = new TextOutputFormat<K, V>();
[*] }
[*] return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
[*]}
[*]
[*] @Override
[*] protected String generateFileNameForKeyValue(K key, V value, String name) {
[*] return name + "_" + value.toString();
[*] }
[*]
[*]
[*]}
viewplaincopy
[*]package org.apache.hadoop.mapred.lib;
[*]
[*]import java.io.IOException;
[*]
[*]import org.apache.hadoop.fs.FileSystem;
[*]import org.apache.hadoop.io.Writable;
[*]import org.apache.hadoop.io.WritableComparable;
[*]import org.apache.hadoop.mapred.JobConf;
[*]import org.apache.hadoop.mapred.RecordWriter;
[*]import org.apache.hadoop.mapred.TextOutputFormat;
[*]import org.apache.hadoop.util.Progressable;
[*]
[*]public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
[*] extends MultipleOutputFormat<K, V> {
[*]
[*]private TextOutputFormat<K, V> theTextOutputFormat = null;
[*]
[*]@Override
[*]protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
[*] String name, Progressable arg3) throws IOException {
[*] if (theTextOutputFormat == null) {
[*] theTextOutputFormat = new TextOutputFormat<K, V>();
[*] }
[*] return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
[*]}
[*]
[*] @Override
[*] protected String generateFileNameForKeyValue(K key, V value, String name) {
[*] return name + "_" + value.toString();
[*] }
[*]
[*]
[*]}
试一下wordcount这个例子,把WordCount.java的run函数加上一行
conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);
即
Java代码 http://coderplay.javaeye.com/images/icon_copy.gif
[*]public int run(String[] args) throws Exception {
[*] JobConf conf = new JobConf(getConf(), WordCount.class);
[*] conf.setJobName("wordcount");
[*]
[*] // the keys are words (strings)
[*] conf.setOutputKeyClass(Text.class);
[*] // the values are counts (ints)
[*] conf.setOutputValueClass(IntWritable.class);
[*]
[*] conf.setMapperClass(MapClass.class);
[*] conf.setCombinerClass(Reduce.class);
[*] conf.setReducerClass(Reduce.class);
[*]
[*] conf.setOutputFormat(org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.class);
[*]
[*] List<String> other_args = new ArrayList<String>();
[*] for(int i=0; i < args.length; ++i) {
[*] try {
[*] if ("-m".equals(args)) {
[*] conf.setNumMapTasks(Integer.parseInt(args[++i]));
[*] } else if ("-r".equals(args)) {
[*] conf.setNumReduceTasks(Integer.parseInt(args[++i]));
[*] } else {
[*] other_args.add(args);
[*] }
[*] } catch (NumberFormatException except) {
[*] System.out.println("ERROR: Integer expected instead of " + args);
[*] return printUsage();
[*] } catch (ArrayIndexOutOfBoundsException except) {
[*] System.out.println("ERROR: Required parameter missing from " +
[*] args1]);
[*] return printUsage();
[*] }
[*] }
[*] // Make sure there are exactly 2 parameters left.
[*] if (other_args.size() != 2) {
[*] System.out.println("ERROR: Wrong number of parameters: " +
[*] other_args.size() + " instead of 2.");
[*] return printUsage();
[*] }
[*] FileInputFormat.setInputPaths(conf, other_args.get(0));
[*] FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
[*]
[*] JobClient.runJob(conf);
[*] return 0;
[*]}
则使用
bin/hadoop jar build/hadoop-*-examples.jar wordcount confwordcount_output
可输出一个目录wordcount_output
Java代码 http://coderplay.javaeye.com/images/icon_copy.gif
[*]$ls wordcount_output/
[*]part-00000_1 part-00000_13 part-00000_16part-00000_214part-00000_28part-00000_38part-00000_5 part-00000_8
[*]part-00000_10 part-00000_14 part-00000_17part-00000_22 part-00000_29part-00000_4 part-00000_6 part-00000_9
[*]part-00000_102part-00000_141part-00000_19part-00000_23 part-00000_3 part-00000_42part-00000_62
[*]part-00000_11 part-00000_143part-00000_2 part-00000_24 part-00000_31part-00000_44part-00000_63
[*]part-00000_117part-00000_15 part-00000_20part-00000_25 part-00000_35part-00000_46part-00000_7
[*]part-00000_12 part-00000_152part-00000_21part-00000_26 part-00000_36part-00000_47part-00000_70
版权声明:本文为博主原创文章,未经博主允许不得转载。
页:
[1]