aini 发表于 2018-10-30 08:29:23

学习日志---基于hadoop实现PageRank

package bbdt.steiss.pageRank;  

  
import java.io.BufferedReader;
  
import java.io.BufferedWriter;
  
import java.io.IOException;
  
import java.io.InputStreamReader;
  
import java.io.OutputStreamWriter;
  
import java.net.URI;
  
import java.util.ArrayList;
  
import java.util.HashMap;
  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.fs.FSDataInputStream;
  
import org.apache.hadoop.fs.FSDataOutputStream;
  
import org.apache.hadoop.fs.FileSystem;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.LongWritable;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.Job;
  
import org.apache.hadoop.mapreduce.Mapper;
  
import org.apache.hadoop.mapreduce.Reducer;
  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  

  
public class PageRank {
  

  
    public static class PageMapper extends Mapper{
  

  
      private Text averageValue = new Text();
  
      private Text node = new Text();
  

  
      @Override
  
      //把每行数据的对应节点的分pagerank找出,并输出,当前节点的值除以指向节点的总数
  
      protected void map(LongWritable key, Text value,
  
                Context context)
  
                throws IOException, InterruptedException {
  
            String string = value.toString();
  
            String [] ss = string.split(",");
  
            int length = ss.length;
  
            double pageValue = Double.parseDouble(ss);
  
            double average = pageValue/(length-2);
  
            averageValue.set(String.valueOf(average));
  
            int i = 2;
  
            while(i0.1)
  
      {
  
            if(flag == 1)
  
            {
  
                //初次调用mapreduce不操作这个
  
                //这个是把mapreduce的输出文件复制到输入文件中,作为这次mapreduce的输入文件
  
                copyFile();
  
                flag = 0;
  
            }
  
            Configuration configuration = new Configuration();
  
            Job job = Job.getInstance(configuration);
  

  
            job.setJarByClass(PageRank.class);
  
            job.setMapperClass(PageMapper.class);
  
            job.setReducerClass(PageReducer.class);
  

  
            job.setMapOutputKeyClass(Text.class);
  
            job.setMapOutputValueClass(Text.class);
  

  
            job.setOutputKeyClass(Text.class);
  
            job.setOutputValueClass(Text.class);
  

  
            job.setInputFormatClass(TextInputFormat.class);
  
            job.setOutputFormatClass(TextOutputFormat.class);
  

  
            FileInputFormat.addInputPath(job, inputPath);
  
            FileOutputFormat.setOutputPath(job, outputPath);
  
            job.addCacheArchive(cachePath.toUri());
  
            outputPath.getFileSystem(configuration).delete(outputPath, true);
  
            job.waitForCompletion(true);
  

  
            String outpathString = outputPath.toString()+"/part-r-00000";
  
            //计算两个文件的各节点的pagerank值差
  
            result = fileDo(inputPath, new Path(outpathString));
  
            flag = 1;
  
      }
  
            System.exit(0);
  
    }
  

  
    //计算两个文件的每个节点的pagerank差值,返回
  
    public static double fileDo(Path inputPath,Path outPath) throws Exception
  
    {
  
         Configuration conf = new Configuration();
  
         conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
  
         FileSystem fs = FileSystem.get(conf);
  
         FSDataInputStream in1 = null;
  
         FSDataInputStream in2 = null;
  
         in1 = fs.open(inputPath);
  
         in2 = fs.open(outPath);
  
         BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
  
         BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));
  
         String s1 = null;
  
         String s2 = null;
  
         ArrayList arrayList1 = new ArrayList();
  
         ArrayList arrayList2 = new ArrayList();
  
         while ((s1 = br1.readLine()) != null)
  
         {
  
             String[] ss = s1.split(",");
  
             arrayList1.add(Double.parseDouble(ss));
  
         }
  
         br1.close();
  

  
         while ((s2 = br2.readLine()) != null)
  
         {
  
             String[] ss = s2.split(",");
  
             arrayList2.add(Double.parseDouble(ss));
  
         }
  
         double res = 0;
  

  
         for(int i = 0;i
页: [1]
查看完整版本: 学习日志---基于hadoop实现PageRank