xiu12 发表于 2018-10-30 06:51:18

Hadoop2.6.0学习笔记(七)MapReduce分区

package com.lucl.hadoop.mapreduce.part;  

  
import java.io.DataOutputStream;
  
import java.io.IOException;
  
import java.io.UnsupportedEncodingException;
  
import java.util.HashMap;
  

  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.OutputCommitter;
  
import org.apache.hadoop.mapreduce.RecordWriter;
  
import org.apache.hadoop.mapreduce.TaskAttemptContext;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  

  
/**
  
*
  
* @author luchunli
  
* @description 自定义OutputFormat
  
*/
  
public class ProtocolOutputFormat extends TextOutputFormat {
  
    protected static class ProtocolRecordWriter extends RecordWriter {
  
      private static final String utf8 = "UTF-8";
  
      private static final byte[] newline;
  
      static {
  
          try {
  
            newline = "\n".getBytes(utf8);
  
          } catch (UnsupportedEncodingException uee) {
  
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  
          }
  
      }
  

  
      protected TaskAttemptContext context = null;
  

  
      protected HashMap recordStream = null;
  
      protected Path workPath = null;
  

  
      public ProtocolRecordWriter () {}
  

  
      public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) {
  
            this.context = context;
  
            this.workPath = workPath;
  
            recordStream = new HashMap();
  
      }
  

  
      @Override
  
      public void write(Text key, Text value) throws IOException, InterruptedException {
  
            boolean nullKey = key == null;
  
            boolean nullValue = value == null;
  
            if (nullKey && nullValue) {
  
                return;
  
            }
  
            DataOutputStream out = recordStream.get(key);
  
            if (null == out) {
  
                  Path file = new Path(workPath, key + ".txt");
  
                  out = file.getFileSystem(this.context.getConfiguration()).create(file, false);
  
                  recordStream.put(key, out);
  
            }
  
            if (!nullKey) {
  
               out.write(key.getBytes(), 0, key.getLength());
  
            }
  
            if (!(nullKey || nullValue)) {
  
                out.write("\t".getBytes());
  
            }
  
            if (!nullValue) {
  
               out.write(value.getBytes(), 0, value.getLength());
  
            }
  
            out.write(newline);
  
      }
  

  
      @Override
  
      public void close(TaskAttemptContext context) throws IOException,
  
                InterruptedException {
  
            for (DataOutputStream out : recordStream.values()) {
  
                out.close();
  
            }
  
            recordStream.clear();
  
            recordStream = null;
  
      }
  
    }
  

  
    @Override
  
    public RecordWriter getRecordWriter(TaskAttemptContext context)
  
            throws IOException, InterruptedException {
  
      Path workPath = this.getTaskOutputPath(context);
  
      return new ProtocolRecordWriter(context, workPath);
  
    }
  

  
    private Path getTaskOutputPath(TaskAttemptContext context) throws IOException {
  
      Path workPath = null;
  
      OutputCommitter committer = super.getOutputCommitter(context);
  

  
      if (committer instanceof FileOutputCommitter) {
  
            // Get the directory that the task should write results into.
  
            workPath = ((FileOutputCommitter) committer).getWorkPath();
  
      } else {
  
            // Get the {@link Path} to the output directory for the map-reduce job.
  
            // context.getConfiguration().get(FileOutputFormat.OUTDIR);
  
            Path outputPath = super.getOutputPath(context);
  
            if (null == outputPath) {
  
                throw new IOException("Undefined job output-path.");
  
            }
  
            workPath = outputPath;
  
      }
  

  
      return workPath;
  
    }
  
}


页: [1]
查看完整版本: Hadoop2.6.0学习笔记(七)MapReduce分区