|
1map阶段
输入:MovieID,UserID,Rating,Date
输出:
import java.io.*;
import java.util.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class MyMapper {
public static class MapClass extends MapReduceBase
implements Mapper {
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector output,
Reporter reporter) throws IOException {
//将Text value 转化为string
String line = value.toString();
//每行的电影评分数据 "movieID,userID,rating,date"
//字段之间用 ","分隔
StringTokenizer itr = new StringTokenizer(line, ",");
String name = itr.nextToken();
//设置 movieID作为 Key
word.set(name);
// ratingAndDate 保存每部电影的 rating and date
String ratingAndDate = "";
//跳过 userID
itr.nextToken();
ratingAndDate = itr.nextToken();
ratingAndDate += "," + itr.nextToken();
//输出 到reducer
output.collect(word, new Text(ratingAndDate));
}
}
}
2reduce阶段
import java.io.IOException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
//Reducer格式
//
public class MyReducer{
public static class Reduce extends MapReduceBase
implements Reducer {
// Distributed Cache分布式缓存中文件路径
Path[] localFiles = new Path[0];
//HashMap movieTitles 保存 movie_titles.txt中电影信息
HashMap movieTitles = new HashMap();
public void configure(JobConf job) {
if(job.getBoolean("netflixDriver.distributedCacheFile", false)) {
//获取分布式缓存文件的路径
try {
localFiles = DistributedCache.getLocalCacheFiles(job);
}
catch (IOException ioe) {
System.err.println("Caught exception while getting cached files " + StringUtils.stringifyException(ioe));
}
//如果分布式缓存中已有文件
if(localFiles[0].toString() != null) {
try {
// movie_titles.txt作为分布式缓存中文件
BufferedReader reader = new BufferedReader(new FileReader(localFiles[0].toString()));
//保存缓存文件中的行
String cachedLine = "";
while ((cachedLine = reader.readLine()) != null) {
StringTokenizer cachedIterator = new StringTokenizer(cachedLine, ",");
//获取movie_id
String movieID = cachedIterator.nextToken();
//获取该行剩下的内容
String dateAndTitle = cachedIterator.nextToken();
while(cachedIterator.hasMoreTokens())
{
dateAndTitle += "," + cachedIterator.nextToken();
}
movieTitles.put(movieID, dateAndTitle);
}
} catch (IOException ioe) {
System.err.println("Caught Exception while parsing the cached file " + StringUtils.stringifyException(ioe));
}
}
}
}
public void reduce(Text key, Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
int firstDate = 0;
int lastDate = 0;
double rating = 0.0;
int ratingCount = 0;
String line;
String dateStr = "";
while(values.hasNext()) {
line = values.next().toString();
StringTokenizer itr = new StringTokenizer(line, ",");
rating += Integer.parseInt(itr.nextToken());
dateStr = itr.nextToken();
dateStr = dateStr.replaceAll("-","");
if(firstDate == 0) {
firstDate = Integer.parseInt(dateStr);
lastDate = firstDate;
ratingCount++;
}
if(Integer.parseInt(dateStr) > lastDate) {
lastDate = Integer.parseInt(dateStr);
}
if(Integer.parseInt(dateStr) < firstDate) {
firstDate = Integer.parseInt(dateStr);
}
ratingCount++;
}
String movieInfo = movieTitles.get(key.toString());
StringTokenizer tokenizer = new StringTokenizer(movieInfo, ",");
String prodDate = tokenizer.nextToken();
String movieTitle = tokenizer.nextToken();
while(tokenizer.hasMoreTokens())
{
movieTitle += "," + tokenizer.nextToken();
}
//计算每部电影的平均评分
rating = rating/ratingCount;
String dateRange = Integer.toString(firstDate) + "," + Integer.toString(lastDate);
dateRange += "," + prodDate;
dateRange += "," + ratingCount;
dateRange += "," + rating;
dateRange += "," + movieTitle;
Text dateRangeText = new Text(dateRange);
//输出
output.collect(key, dateRangeText);
}
}
}
3主程序
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.taobao.MyMapper;
import com.taobao.MyReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.filecache.DistributedCache;
public class netflixDriver extends Configured implements Tool {
static int printUsage() {
System.out.println("netflixDriver [-m ] [-r ] ");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), MyMapper.class);
conf.setJobName("netflixDriver");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(MyMapper.MapClass.class);
conf.setReducerClass(MyReducer.Reduce.class);
List other_args = new ArrayList();
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args)) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args)) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else if ("-d".equals(args)) {
DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
conf.setBoolean("netflixDriver.distributedCacheFile", true);
} else {
other_args.add(args);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new netflixDriver(), args);
System.exit(res);
}
} |
|
|