这两天在网上看了个MapReduce的多文件输出的帖子: http://blog.csdn.net/inkfish。写的不错。
我试着完成了一下。也是分为三个文件:我这三个文件,跟原作者的稍有不同。其中有些类是我原来写的,我直接拷贝过来的,所以有点不同。
My_LineRead.java
public class My_LineRead<K, V> extends RecordWriter<K, V>{private static final String utf8 = "UTF-8";private static final String colon = "----"; //划分符号private static final byte[] newline;static {try {newline = "/n".getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}protected DataOutputStream out;private final byte[] keyValueSeparator;public My_LineRead(DataOutputStream out) {this(out, colon); //调用下面的构造函数}public My_LineRead(DataOutputStream out, String keyValueSeparator) {// TODO Auto-generated constructor stubthis.out = out;try {this.keyValueSeparator = keyValueSeparator.getBytes(utf8);} catch (UnsupportedEncodingException e) {// TODO Auto-generated catch blockthrow new IllegalArgumentException("can't find " + utf8 + " encoding");}}@Overridepublic void close(TaskAttemptContext arg0) throws IOException,InterruptedException {// TODO Auto-generated method stubout.close();}@Overridepublic void write(K key, V value) throws IOException,InterruptedException {if (!(key == null && key instanceof NullWritable)){//如果key不为空者输出keyif ((Object)key instanceof Text){Text to = (Text) key;out.write(to.getBytes(), 0, to.getLength());}else{out.write(key.toString().getBytes(utf8));}out.write(keyValueSeparator);}if (!(value == null && value instanceof NullWritable)){//如果value不为空则输出valueif ((Object)value instanceof Text){Text to = (Text) value;out.write(to.getBytes(), 0, to.getLength());}else{out.write(value.toString().getBytes(utf8));}out.write(newline);}}}
MyMultipleOutputFormat.java //这个类,我添加了些注释便于理解
public abstract class MyMultipleOutputFormat <K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> {//接口类,需要在主程序中实现generateFileNameForKeyValue来获取文件名private MultiRecordWriter writer = null; @Overridepublic RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// TODO Auto-generated method stub//如果第一次调用那么writer=nullif (writer == null) { //getTaskOutputPath获取output路径writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer;}private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {Path workPath = null;OutputCommitter committer = super.getOutputCommitter(conf);if (committer instanceof FileOutputCommitter) {workPath = ((FileOutputCommitter) committer).getWorkPath();} else {Path outputPath = super.getOutputPath(conf);if (outputPath == null) {throw new IOException("Undefined job output-path");}workPath = outputPath;}return workPath;}/**通过key, value, conf来确定输出文件名(含扩展名)*///返回值就是文件名。可以根据key,value来判断protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);//MultiRecordWriter类public class MultiRecordWriter extends RecordWriter<K, V> {/**RecordWriter的缓存*/private HashMap<String, RecordWriter<K, V>> recordWriters = null;private TaskAttemptContext job = null;/**输出目录*/private Path workPath = null;//构造函数public MultiRecordWriter(TaskAttemptContext job, Path workPath) {super();this.job = job;this.workPath = workPath;recordWriters = new HashMap<String, RecordWriter<K, V>>();}//关闭,应该可能是多个文件进行关闭,所有采用循环//recordWriters.values() 就是指的getBaseRecordWriter返回的值。@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();while (values.hasNext()) {values.next().close(context);}this.recordWriters.clear();}@Overridepublic void write(K key, V value) throws IOException, InterruptedException {//得到输出文件名String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());//如果recordWriters里没有文件名,那么就建立。否则就直接写值。RecordWriter<K, V> rw = this.recordWriters.get(baseName);if (rw == null) {rw = getBaseRecordWriter(job, baseName);//放入HashMapthis.recordWriters.put(baseName, rw);}rw.write(key, value);}// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)throws IOException, InterruptedException {//获取配置文件Configuration conf = job.getConfiguration();//查看是否使用解码器boolean isCompressed = getCompressOutput(job);String keyValueSeparator = ",";RecordWriter<K, V> recordWriter = null;if (isCompressed) {Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,GzipCodec.class);CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);Path file = new Path(workPath, baseName + codec.getDefaultExtension());FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);recordWriter = new My_LineRead<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);}//如果不使用解码器else {Path file = new Path(workPath, baseName);FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);//recordWriter = new My_LineRead<K, V>(fileOut, keyValueSeparator);//这里我使用的我自己的OutputFormatrecordWriter = new My_LineRead<K, V>(fileOut);}return recordWriter;}}}
最后就是测试类,WordCount_MulFileOut.java
public class WordCount_MulFileOut {public static class wordcountMapper extendsMapper<LongWritable, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{String line = value.toString();StringTokenizer itr = new StringTokenizer(line);while(itr.hasMoreElements()){word.set(itr.nextToken());context.write(word, one);}}}public static class wordcountReduce extendsReducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterable<IntWritable>values, Context context)throws IOException, InterruptedException{int sum = 0;for (IntWritable str : values){sum += str.get();}context.write(key, new IntWritable(sum));}}public static class MyMultiple extends MyMultipleOutputFormat{@Overrideprotected String generateFileNameForKeyValue(WritableComparable key,Writable value, Configuration conf) {// TODO Auto-generated method stubreturn "other.txt";}}public static void main(String args[])throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "wordcount");job.setJarByClass(WordCount_MulFileOut.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(MyMultiple.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setMapperClass(wordcountMapper.class);job.setReducerClass(wordcountReduce.class);job.setCombinerClass(wordcountReduce.class);FileInputFormat.setInputPaths(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, new Path(args[2]));job.waitForCompletion(true);}} |