|
2012st26
倒排索引概念
倒排索引(Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。是文档检索中常用的数据结构。和索引的目的是一样的,倒排索引是为了提高检索效率而设计的。
需要做倒排索引的文档通常都是数量巨大的,但是对不同的文档做索引可以并行,这事mapreduce最擅长做了,这就是我们今天所做的事。
设计说明:
实验的输入是一组文档,输出的每行是:term 5@doc1 6@doc2 …
即每行输出一个单词,后面接着是该单词在某文档中的出现次数,每行中文档的是按序存放的,整个结果中单词也是排好序的。
基于以上要求,各个类的设计如下:
InvertedIndexMapper:
由于读入的每行文本中包含大量的非单词字符,因此我们需要对此进行处理,此处的处理办法是将每个非单词字符([^0-9a-zA-Z])替换为空格,另外文档中大小写的问题也需要处理,我们在这儿统一将其转化为小写。另外在这一阶段需要做的事就是获取文件名,当然我们也可以自己定义一个InputFormat和RecordReader实现同样的效果,单是获取文件名我们可以采取这种相对简单的方式。剩下的事情就是提取单词进行发射:由于我们需要对出现某个单词的所有文件的文件名进行排序,可以利用系统自动对此进行排序,方法是将文件名和单词合并成这样的形式word@docid,最后我们在map阶段发射的键值对就是(word@docid, 1)。
InvertedIndexCombiner:
这个类所做的事就是对map发射的结果进行简单的合并,用wordcount里面的那个reduce类就可以。
InvertedIndexPartitioner:
在wordcount里面是没有这个类的,我们之所以要在这儿定制自己的partitioner,是因为我们发射过来的键值对是(word@docid, appeartime)形式的,也就是说word@doc1和word@doc2是不同的key,也就意味着可能会分配到不同的Reducer上。解决方案是将组合键 临时拆开,“蒙骗”partitioner按照而不是进行分区选择正确的Reducer,这样可保证同一个word下的一组键值对一定被分区到同一个Reducer。
InvertedIndexReducer:
Reducer完成的工作就是对相同key的键值对进行合并,即将来的(word@doc1, appeartime1)、(word@doc1, appeartime2)……合并为(word appeartime1@doc1 appeartime2@doc2 ……),采取的方案是使用一个vector和全局的oldword,处理逻辑如下:
if(oldword == null || oldword != word){
if(oldword != null){
发射(word, all_string_in_vector);
}
vector清空;
oldword = word;
}
将appeartime@docid加入vector;
整个reduce工作完成后,还有最后一组索引没有发射出去,也就是说我们要自己完成清理工作,只需要重写Reducer类中的cleanup方法即可,此方法会在整个reduce完成之后被调用,我们在里面发射最后一组索引。
具体代码:
View Code
1 import java.io.IOException;
2 import java.util.StringTokenizer;
3 import java.util.Vector;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.FileSystem;
7 import org.apache.hadoop.fs.Path;
8 import org.apache.hadoop.io.IntWritable;
9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19 import org.apache.hadoop.util.GenericOptionsParser;
20
21 import com.sun.org.apache.bcel.internal.generic.NEW;
22 /*
23 *代码版权:2012st26
24 *by:backingwu
25 */
26 //map阶段输出为 key@DocID, value
27 public class InvertedIndex {
28
29 private static Text oldkey = null;
30 private static Vector vector1 = new Vector();
31
32 public static class InvertedIndexMapper extends
33 Mapper{
34 private final static IntWritable one = new IntWritable(1);
35 public void map(LongWritable key, Text value, Context context)
36 throws IOException, InterruptedException{
37 //获取文件名及预处理
38 FileSplit fileSplit = (FileSplit)context.getInputSplit();
39 String fileName = fileSplit.getPath().getName();
40 String wordlineString = new String(value.toString().replaceAll("[^1-9a-zA-Z]", " "));
41 StringTokenizer itr = new StringTokenizer(wordlineString.toLowerCase());
42
43 while(itr.hasMoreTokens()){
44 String tempKey = itr.nextToken();
45 String temp2 = tempKey + "@" + fileName;
46 context.write(new Text(temp2), one);
47 }
48 }
49 }
50
51 public static class InvertedIndexCombiner extends Reducer{
52 private IntWritable result = new IntWritable();
53
54 public void reduce(Text key, Iterable values,
55 Context context) throws IOException, InterruptedException {
56 int sum = 0;
57 for (IntWritable val : values) {
58 sum += val.get();
59 }
60 result.set(sum);
61 context.write(key, result);
62 }
63 }
64
65 public static class InvertedIndexPartioner extends HashPartitioner{
66
67 public int getPartition(Text key, IntWritable value, int numReduceTasks) {
68 // TODO Auto-generated method stub
69
70 String term = key.toString().split("@")[0];
71 super.getPartition(new Text(term), value, numReduceTasks);
72 return 0;
73 }
74
75 }
76
77 public static class InvertedIndexReducer extends Reducer{
78 //reduce阶段输出为 key \tt1@doc1\tt2@doc2\tt3@doc3...
79 public void reduce(Text key, Iterable values, Context context
80 )throws IOException, InterruptedException{
81 String[] temp = key.toString().split("@");
82 Text key1 = new Text(temp[0]);
83 int sum = 0;
84 for (IntWritable val : values) {
85 sum += val.get();
86 }
87 Text valueText = new Text(String.valueOf(sum));
88 if(oldkey == null || !oldkey.equals(key1) ){
89 if(oldkey != null){
90 //发射
91 StringBuffer tmpString = new StringBuffer();
92 for(Text t: vector1){
93 tmpString.append(t.toString());
94 }
95 context.write(oldkey, new Text(tmpString.toString()));
96 }
97
98 vector1.clear();
99 vector1.add(new Text(new String(" " + '\t')));
100 oldkey = key1;
101 }
102 Text addText = new Text(valueText + "@" + temp[1] + '\t');
103 vector1.add(addText);
104 }
105
106
107
108 //reduce阶段的清理工作
109 protected void cleanup(Context context)
110 throws IOException, InterruptedException{
111 StringBuffer tmpString = new StringBuffer();
112 for(Text t: vector1){
113 tmpString.append(t.toString());
114 }
115 context.write(oldkey, new Text(tmpString.toString()));
116 }
117
118 }
119
120 public static void main(String[] args){
121 try{
122 Configuration conf = new Configuration();
123 String[] otherArgs = new GenericOptionsParser(conf, args)
124 .getRemainingArgs();
125 if (otherArgs.length != 2) {
126 System.err.println("Usage: InvertedIndex ");
127 System.exit(2);
128 }
129 Job job = new Job(conf, "invert index");
130 job.setJarByClass(InvertedIndex.class);
131 job.setInputFormatClass(TextInputFormat.class);
132 job.setMapperClass(InvertedIndexMapper.class);
133 job.setReducerClass(InvertedIndexReducer.class);
134 job.setCombinerClass(InvertedIndexCombiner.class);
135 job.setMapOutputKeyClass(Text.class);
136 job.setMapOutputValueClass(IntWritable.class);
137 job.setOutputKeyClass(Text.class);
138 job.setOutputValueClass(Text.class);
139 job.setPartitionerClass(InvertedIndexPartioner.class);
140 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
141 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
142 FileSystem.get(conf).delete(new Path(otherArgs[1]));
143 System.exit(job.waitForCompletion(true) ? 0 : 1);
144 }catch(IOException e){
145 e.printStackTrace();
146 }catch(InterruptedException e){
147 e.printStackTrace();
148 }catch(ClassNotFoundException e){
149 e.printStackTrace();
150 }
151
152 }
153 }
本文来自大笤帚---扫除一切障碍,奋勇向前冲的大笤帚! |
|
|