|
0.参考资料:
代码参考1:http://www.pudn.com/downloads212/sourcecode/unix_linux/detail999273.html
理论参考2:http://zhangyu8374.javaeye.com/blog/86307,http://nything.javaeye.com/blog/411787
1.分析
假如有file0,file1,file2三个文件,这些文件中都保存了一些文本内容,比如在file0中只有一个句子,内容为"we are happy"。一般的索引都是记录在这个文件中没有一个单词的索引号。比如file0的索引可以是(we,0),(are,1),(happy,2)。这样的键值对中key是单词,value是这个单词在这个文件中的位置。但是,反向索引刚好相反,对应于多个文件,我们要求出某一个单词在所有这些文件中出现的位置。我们可以按如下操作进行实验:
在本地创建文件夹IndexTest并在里面创建3个文件,每个文件中的内容如下。
* T0 = "it is what it is"
* T1 = "what is it"
* T2 = "it is a banana"
其中T0,T1,T2分别是文件名,后面为文件内容。将IndexTest文件夹上传到DFS中。然后运行反向索引程序。反向索引程序见代码示例。
最后输出结果为:
a (T2, 3)
banana (T2, 4)
is (T2, 2) (T0, 2) (T0, 5) (T1, 2)
it (T1, 3) (T2, 1) (T0, 1) (T0, 4)
what (T0, 3) (T1, 1)
2.代码示例
InvertedIndex.java
View Code
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package pa4;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
*
* @author Ming
*/
public class InvertedIndex {
public static class TokenizerMapper
extends Mapper {
@Override
public void map(Text key, ValuePair value, Context context) throws IOException, InterruptedException {
// TokenInputFormat has generate (word, (fileID, wordPosition))
// so mapper just spill it to reducer
key.set(key.toString().toLowerCase());
context.write(key, value);
}
}
public static class IndexReducer
extends Reducer {
private Text postings = new Text();
@Override
public void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {
String list = "";
for (ValuePair val : values) {
list += " " + val.toString();
}
postings.set(list);
context.write(key, postings);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: InvertedIndex ");
System.exit(2);
}
// remove the old output dir
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
Job job = new Job(conf, "Inverted Indexer");
job.setJarByClass(InvertedIndex.class);
job.setInputFormatClass(TokenInputFormat.class);
job.setMapperClass(InvertedIndex.TokenizerMapper.class);
//job.setCombinerClass(InvertedIndex.IndexReducer.class);
job.setReducerClass(InvertedIndex.IndexReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ValuePair.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
TokenInputFormat.java
View Code
package pa4;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.LineReader;
import java.util.StringTokenizer;
public class TokenInputFormat extends FileInputFormat {
/**
* Don't allow the files to be split!
*/
@Override
protected boolean isSplitable(JobContext ctx, Path filename) {
// ensure the input files are not splittable!
return false;
}
/**
* Just return the record reader
* key is the docno
*/
public RecordReader createRecordReader(InputSplit split,
TaskAttemptContext ctx)
throws IOException, InterruptedException {
return new TokenRecordReader();
}
public static class TokenRecordReader extends RecordReader {
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text line;
private Text key = null;
private ValuePair value = null;
private StringTokenizer tokens = null;
private int tokenPos = 0;
private String fileID = "0"; // input file id that appears in inverted index
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// Assume file name is an integer of file ID
fileID = file.getName();
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
in = new LineReader(fileIn, job);
this.pos = start;
line = new Text();
key = new Text();
value = new ValuePair();
}
public boolean nextKeyValue() throws IOException {
boolean splitEnds = false;
while (tokens == null || !tokens.hasMoreTokens()) {
int lineSize = in.readLine(line, maxLineLength,
Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
maxLineLength));
if (lineSize == 0) {
splitEnds = true;
break;
}
pos += lineSize;
tokens = new StringTokenizer(line.toString(), " /t/n/r/f,.;-?///!'/":=*{}()$[]");
}
if (splitEnds) {
key = null;
value = null;
line = null;
tokens = null;
return false;
} else
return true;
}
@Override
public Text getCurrentKey() {
key.set(tokens.nextToken());
tokenPos ++;
return key;
}
@Override
public ValuePair getCurrentValue() {
value.set(fileID, tokenPos);
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float) (end - start));
}
}
public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}
public static void main(String[] args)
throws IOException {
String fn = args[0];
Configuration conf = new Configuration();
FileSplit split = new FileSplit(new Path(fn), 0, 10000000, null);
TokenRecordReader irr = new TokenRecordReader();
TaskAttemptContext ctx = new TaskAttemptContext(conf,
new TaskAttemptID("hello", 12, true, 12, 12));
irr.initialize(split, ctx);
while (irr.nextKeyValue()) {
System.out.println(irr.getCurrentKey() + ": " + irr.getCurrentValue());
}
}
}
ValuePair.java
View Code
package pa4;
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
import java.io.*;
import org.apache.hadoop.io.*;
/**
*
* @author Ming
*/
public class ValuePair implements WritableComparable {
private Text one;
private IntWritable two;
public void set(Text first, IntWritable second) {
one = first;
two = second;
}
public void set(String first, int second) {
one.set(first);
two.set(second);
}
public ValuePair() {
set(new Text(), new IntWritable());
}
public ValuePair(Text first, IntWritable second) {
set(first, second);
}
public ValuePair(String first, int second) {
set(first, second);
}
public Text getFirst() {
return one;
}
public IntWritable getSecond() {
return two;
}
@Override
public void write(DataOutput out) throws IOException {
one.write(out);
two.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
one.readFields(in);
two.readFields(in);
}
@Override
public int hashCode() {
return one.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof ValuePair) {
ValuePair tp = (ValuePair)o;
return one.equals(tp.one);
}
return false;
}
@Override
public String toString() {
return "(" + one + ", " + two + ")";
}
@Override
public int compareTo(ValuePair tp) {
int cmp = one.compareTo(tp.one);
if (cmp != 0) {
return cmp;
}
return two.compareTo(tp.two);
}
public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
private static final IntWritable.Comparator INT_COMPARATOR = new IntWritable.Comparator();
public Comparator() {
super(ValuePair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int oneL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int oneL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, oneL1, b2, s2, oneL2);
if (cmp != 0) {
return cmp;
}
return INT_COMPARATOR.compare(b1, s1+oneL1, l1-oneL1,
b2, s2+oneL2, l2-oneL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof ValuePair && b instanceof ValuePair) {
return ((ValuePair) a).compareTo((ValuePair) b);
}
return super.compare(a, b);
}
}
static {
WritableComparator.define(ValuePair.class, new Comparator());
}
}
ps:2012-5-20
这里键值对valuepair的运用让我想到了前几天写的Hashmap实现原理。在hashmap的实现过程中,也运用了键值对类Entry。 两者之间有共通之处,有空可以再改进Hashmap实现原理。
|
|