|
package bbdt.steiss.pageRank;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class PageRank {
public static class PageMapper extends Mapper{
private Text averageValue = new Text();
private Text node = new Text();
@Override
//把每行数据的对应节点的分pagerank找出,并输出,当前节点的值除以指向节点的总数
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
String string = value.toString();
String [] ss = string.split(",");
int length = ss.length;
double pageValue = Double.parseDouble(ss[1]);
double average = pageValue/(length-2);
averageValue.set(String.valueOf(average));
int i = 2;
while(i0.1)
{
if(flag == 1)
{
//初次调用mapreduce不操作这个
//这个是把mapreduce的输出文件复制到输入文件中,作为这次mapreduce的输入文件
copyFile();
flag = 0;
}
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(PageRank.class);
job.setMapperClass(PageMapper.class);
job.setReducerClass(PageReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.addCacheArchive(cachePath.toUri());
outputPath.getFileSystem(configuration).delete(outputPath, true);
job.waitForCompletion(true);
String outpathString = outputPath.toString()+"/part-r-00000";
//计算两个文件的各节点的pagerank值差
result = fileDo(inputPath, new Path(outpathString));
flag = 1;
}
System.exit(0);
}
//计算两个文件的每个节点的pagerank差值,返回
public static double fileDo(Path inputPath,Path outPath) throws Exception
{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
FileSystem fs = FileSystem.get(conf);
FSDataInputStream in1 = null;
FSDataInputStream in2 = null;
in1 = fs.open(inputPath);
in2 = fs.open(outPath);
BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));
String s1 = null;
String s2 = null;
ArrayList arrayList1 = new ArrayList();
ArrayList arrayList2 = new ArrayList();
while ((s1 = br1.readLine()) != null)
{
String[] ss = s1.split(",");
arrayList1.add(Double.parseDouble(ss[1]));
}
br1.close();
while ((s2 = br2.readLine()) != null)
{
String[] ss = s2.split(",");
arrayList2.add(Double.parseDouble(ss[1]));
}
double res = 0;
for(int i = 0;i |
|
|