|
package cn.edu.ytu.botao.singletablejoin;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
*
* 单表连接
*
* child parent
* Tom Lucy
* Tom Jack
* Lucy Mary
* Lucy Ben
*
* 左表 : 反向输出
* Lucy Tom
* Jack Tom
*
* 右表 正向输出
* Lucy Mary
* Lucy Ben
*
* 连接后:
*
*
*
* @author botao
*
*/
public class STjoin {
private static int time = 0;
public static class STJMapper extends Mapper{
//标记表
private Text leftTag = new Text("1"); //左表
private Text rightTag = new Text("2"); //右表
@Override
protected void map(Object key, Text value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String childName = new String();
String parentName = new String();
//读取内容
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
//截取的字符串数组
String[] values = new String[2];
int i = 0;
while (tokenizer.hasMoreElements()) {
values[i++] = (String) tokenizer.nextElement();
}
if (values[0].compareTo("child") != 0) {
childName = values[0];
parentName = values[1];
//左表输出 反向 value的值为 grandchild
context.write(new Text(parentName), new Text(leftTag.toString() + ":" + childName));
//右表输出 正向
context.write(new Text(childName), new Text(rightTag.toString() + ":" + parentName));
}
}
}
public static class STJoinReduce extends Reducer{
@Override
protected void reduce(Text key, Iterable values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//记录grandchild的信息 和存储
int grandChlidNum = 0;
String[] grandChild = new String[20];
//记录grandparent 的信息 和存储
int grandParentNum = 0;
String[] grandParent = new String[20];
if (time == 0) {
context.write(new Text("grandChild"), new Text("grandParent"));
time++;
}
/**
* 对于右表 将values的值存入 grandChild[] 中
* 对于左表 将values 的值存入 grandParnet[] 中
*/
for (Text text : values) {
String value = text.toString();
//System.out.println(key.toString() + "..." + value);
String temp[] = value.split(":");
//System.out.println(key.toString() + "....." + temp[0] + "..." + temp[1]);
//左表
if (temp[0].compareTo("1") == 0) {
grandChild[grandChlidNum++] = temp[1];
}
if (temp[0].compareTo("2") == 0) {
grandParent[grandParentNum++] = temp[1];
}
}
//对 grandChild[] 和 grandParent[]进行求笛卡尔积
if (0 != grandChlidNum && 0 != grandParentNum) {
//System.out.println(grandChlidNum + "..." + grandParentNum);
for (int i = 0; i < grandChlidNum; i++) {
for (int j = 0; j < grandParentNum; j++) {
context.write(new Text(grandChild), new Text(grandParent[j]));
}
}
}
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount ");
System.exit(2);
}
// 如果out文件夹存在,现将该文件夹删除
Path path = new Path("out");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(path)) {
fs.delete(path);
}
Job job = new Job(conf , "STJoin");
job.setJarByClass(STjoin.class);
job.setMapperClass(STJMapper.class);
job.setReducerClass(STJoinReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
|
|
|