|
package org.robby.join;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MapJoinWithCache {
public static class Map extends
Mapper {
private CombineValues combineValues = new CombineValues();
private Text flag = new Text();
private Text key = new Text();
private Text value = new Text();
private String[] keyValue = null;
//这个keyMap就是存文件数据供map共享的
private HashMap keyMap = null;
@Override
//这个map每行都会调用一次,传入数据
//每次都会访问keyMap集合
//因为setup方法处理了input1文件,因此这里只需要处理input2就行
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
keyValue = value.toString().split(",", 2);
String name = keyMap.get(keyValue[0]);
this.key.set(keyValue[0]);
String output = name + "," + keyValue[1];
this.value.set(output);
context.write(this.key, this.value);
}
@Override
//这个setup方法是在mapper类初始化运行的方法
protected void setup(Context context) throws IOException,
InterruptedException {
//context传入文件路径
URI[] localPaths = context.getCacheFiles();
keyMap = new HashMap();
for(URI url : localPaths){
//通过uri打开hdfs文件系统
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop1:9000"), context.getConfiguration());
FSDataInputStream in = null;
//打开hdfs的对应文件,需要path类创建并传入,获取流对象
in = fs.open(new Path(url.getPath()));
BufferedReader br=new BufferedReader(new InputStreamReader(in));
String s1 = null;
while ((s1 = br.readLine()) != null)
{
keyValue = s1.split(",", 2);
keyMap.put(keyValue[0], keyValue[1]);
System.out.println(s1);
}
br.close();
}
}
}
public static class Reduce extends Reducer {
//处理都在mpper中进行,reduce迭代分组后的数据就行
@Override
protected void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {
for(Text val : values)
context.write(key, val);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoinWithCache.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path(args[1]);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
//其他都一样,这里在job中加入了要传入的文件路径,用作cache
//可以传入多个文件,文件全路径
job.addCacheFile(new Path(args[2]).toUri());
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
|
|
|