|
有一批电话通信清单,保存了主叫和被叫的记录,记录格式下,主叫和被叫之间是以空格隔开的。
13400001111 10086
13500002222 10000
13600003333 114
13700004444 12580
13711111111 10086
13822222222 12580
13922225555 12580
18622220000 114
18800000000 114
现在需要做一个倒排索引,记录拨打给被叫的所有主叫号码,记录的格式如下,主叫号码之间以|分隔。
10000 13500002222|
10086 13400001111|13711111111|
114 13600003333|18622220000|18800000000|
12580 13700004444|13822222222|13922225555|
1、算法思路
源文件——》Mapper(分隔原始数据,以被叫作为key,以主叫作为value)——》Reducer(把拥有相同被叫的主叫号码用|分隔汇总)——》输出到HDFS
2、Hadoop程序
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class App_2 extends Configured implements Tool{
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf,"App_2");
job.setJarByClass(App_2.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
job.setMapperClass(CallMapper.class);
job.setReducerClass(CallReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
enum Counter{
SKIPLINE,//记录出错的行数
}
/**
*Mapper
*LongWritable,Text 是输入数据的key和value 如:清单的每一行的首字符的偏移量作为key,整一行的内容作为value
*Text,Text 是输出数据的key和value
*
*/
public static class CallMapper extends Mapper {
//map(LongWritable key,Text value,Context context)
//LongWritable key,Text value,和CallMapper类的输入数据的key、value对应
//Context 上下文环境
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
try {
String line = value.toString();
String[] call = line.split(" ");
String caller = call[0];//主叫
String callee = call[1];//被叫
Text outKey = new Text(callee);
Text outValue = new Text(caller);
context.write(outKey, outValue);//被叫作为key,主叫作为value输出
} catch(ArrayIndexOutOfBoundsException e) {
context.getCounter(Counter.SKIPLINE).increment(1);//出错,行数+1
return;
}
}
}
/**
*Reducer
*Text,Text,是输入数据的key和value,对应Mapper中的输出数据
*Text,Text 是最终输出数据的key和value
*
*/
public static class CallReducer extends Reducer{
//reduce(Text key,Text value,Context context)
//Text key,Iterable values,和CallMapper类的输出数据的key、value对应,其中values是对应key的所有主叫的集合
//Context 上下文环境
public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException {
String result = "";
String temp = "";
//对主叫用|分隔
for(Text value : values) {
temp = value.toString();
result += (temp + "|");
}
Text outValue = new Text(result);
//最终输出:被叫 用|分隔的主叫
context.write(key, outValue);
}
}
public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(), new App_2(), args);
System.exit(res);
}
}
3、可以在eclipse中运行程序,输入两个参数,一个是通话清单文件所在路径,一个是结果输出目录
4、也可以将程序打成jar包,用命令执行。
[coder@h1 hadoop-0.20.2]$ bin/hadoop jar /home/coder/call.jar /user/coder/in/call.txt /user/coder/output
注意:/user/coder/in/call.txt 和/user/coder/output都是HDFS中的路径
|
|