|
package com.lucl.hadoop.mapreduce.part;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
*
* @author luchunli
* @description 自定义OutputFormat
*/
public class ProtocolOutputFormat extends TextOutputFormat {
protected static class ProtocolRecordWriter extends RecordWriter {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected TaskAttemptContext context = null;
protected HashMap recordStream = null;
protected Path workPath = null;
public ProtocolRecordWriter () {}
public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) {
this.context = context;
this.workPath = workPath;
recordStream = new HashMap();
}
@Override
public void write(Text key, Text value) throws IOException, InterruptedException {
boolean nullKey = key == null;
boolean nullValue = value == null;
if (nullKey && nullValue) {
return;
}
DataOutputStream out = recordStream.get(key);
if (null == out) {
Path file = new Path(workPath, key + ".txt");
out = file.getFileSystem(this.context.getConfiguration()).create(file, false);
recordStream.put(key, out);
}
if (!nullKey) {
out.write(key.getBytes(), 0, key.getLength());
}
if (!(nullKey || nullValue)) {
out.write("\t".getBytes());
}
if (!nullValue) {
out.write(value.getBytes(), 0, value.getLength());
}
out.write(newline);
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
for (DataOutputStream out : recordStream.values()) {
out.close();
}
recordStream.clear();
recordStream = null;
}
}
@Override
public RecordWriter getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Path workPath = this.getTaskOutputPath(context);
return new ProtocolRecordWriter(context, workPath);
}
private Path getTaskOutputPath(TaskAttemptContext context) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(context);
if (committer instanceof FileOutputCommitter) {
// Get the directory that the task should write results into.
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
// Get the {@link Path} to the output directory for the map-reduce job.
// context.getConfiguration().get(FileOutputFormat.OUTDIR);
Path outputPath = super.getOutputPath(context);
if (null == outputPath) {
throw new IOException("Undefined job output-path.");
}
workPath = outputPath;
}
return workPath;
}
}
|
|
|