82870034 发表于 2018-10-31 06:42:58

用Hadoop构建电影推荐系统

package org.conan.myhadoop.recommend;  

  
import java.io.IOException;
  
import java.util.ArrayList;
  
import java.util.HashMap;
  
import java.util.Iterator;
  
import java.util.List;
  
import java.util.Map;
  

  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.IntWritable;
  
import org.apache.hadoop.io.LongWritable;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapred.FileInputFormat;
  
import org.apache.hadoop.mapred.FileOutputFormat;
  
import org.apache.hadoop.mapred.JobClient;
  
import org.apache.hadoop.mapred.JobConf;
  
import org.apache.hadoop.mapred.MapReduceBase;
  
import org.apache.hadoop.mapred.Mapper;
  
import org.apache.hadoop.mapred.OutputCollector;
  
import org.apache.hadoop.mapred.Reducer;
  
import org.apache.hadoop.mapred.Reporter;
  
import org.apache.hadoop.mapred.RunningJob;
  
import org.apache.hadoop.mapred.TextInputFormat;
  
import org.apache.hadoop.mapred.TextOutputFormat;
  
import org.conan.myhadoop.hdfs.HdfsDAO;
  

  
public class Step4 {
  

  
    public static class Step4_PartialMultiplyMapper extends MapReduceBase implements Mapper {
  
      private final static IntWritable k = new IntWritable();
  
      private final static Text v = new Text();
  

  
      private final static Map cooccurrenceMatrix = new HashMap();
  

  
      @Override
  
      public void map(LongWritable key, Text values, OutputCollector output, Reporter reporter) throws IOException {
  
            String[] tokens = Recommend.DELIMITER.split(values.toString());
  

  
            String[] v1 = tokens.split(":");
  
            String[] v2 = tokens.split(":");
  

  
            if (v1.length > 1) {// cooccurrence
  
                int itemID1 = Integer.parseInt(v1);
  
                int itemID2 = Integer.parseInt(v1);
  
                int num = Integer.parseInt(tokens);
  

  
                List list = null;
  
                if (!cooccurrenceMatrix.containsKey(itemID1)) {
  
                  list = new ArrayList();
  
                } else {
  
                  list = cooccurrenceMatrix.get(itemID1);
  
                }
  
                list.add(new Cooccurrence(itemID1, itemID2, num));
  
                cooccurrenceMatrix.put(itemID1, list);
  
            }
  

  
            if (v2.length > 1) {// userVector
  
                int itemID = Integer.parseInt(tokens);
  
                int userID = Integer.parseInt(v2);
  
                double pref = Double.parseDouble(v2);
  
                k.set(userID);
  
                for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) {
  
                  v.set(co.getItemID2() + "," + pref * co.getNum());
  
                  output.collect(k, v);
  
                }
  

  
            }
  
      }
  
    }
  

  
    public static class Step4_AggregateAndRecommendReducer extends MapReduceBase implements Reducer {
  
      private final static Text v = new Text();
  

  
      @Override
  
      public void reduce(IntWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
  
            Map result = new HashMap();
  
            while (values.hasNext()) {
  
                String[] str = values.next().toString().split(",");
  
                if (result.containsKey(str)) {
  
                  result.put(str, result.get(str) + Double.parseDouble(str));
  
                } else {
  
                  result.put(str, Double.parseDouble(str));
  
                }
  
            }
  
            Iterator iter = result.keySet().iterator();
  
            while (iter.hasNext()) {
  
                String itemID = iter.next();
  
                double score = result.get(itemID);
  
                v.set(itemID + "," + score);
  
                output.collect(key, v);
  
            }
  
      }
  
    }
  

  
    public static void run(Map path) throws IOException {
  
      JobConf conf = Recommend.config();
  

  
      String input1 = path.get("Step4Input1");
  
      String input2 = path.get("Step4Input2");
  
      String output = path.get("Step4Output");
  

  
      HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
  
      hdfs.rmr(output);
  

  
      conf.setOutputKeyClass(IntWritable.class);
  
      conf.setOutputValueClass(Text.class);
  

  
      conf.setMapperClass(Step4_PartialMultiplyMapper.class);
  
      conf.setCombinerClass(Step4_AggregateAndRecommendReducer.class);
  
      conf.setReducerClass(Step4_AggregateAndRecommendReducer.class);
  

  
      conf.setInputFormat(TextInputFormat.class);
  
      conf.setOutputFormat(TextOutputFormat.class);
  

  
      FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2));
  
      FileOutputFormat.setOutputPath(conf, new Path(output));
  

  
      RunningJob job = JobClient.runJob(conf);
  
      while (!job.isComplete()) {
  
            job.waitForCompletion();
  
      }
  
    }
  

  
}
  

  
class Cooccurrence {
  
    private int itemID1;
  
    private int itemID2;
  
    private int num;
  

  
    public Cooccurrence(int itemID1, int itemID2, int num) {
  
      super();
  
      this.itemID1 = itemID1;
  
      this.itemID2 = itemID2;
  
      this.num = num;
  
    }
  

  
    public int getItemID1() {
  
      return itemID1;
  
    }
  

  
    public void setItemID1(int itemID1) {
  
      this.itemID1 = itemID1;
  
    }
  

  
    public int getItemID2() {
  
      return itemID2;
  
    }
  

  
    public void setItemID2(int itemID2) {
  
      this.itemID2 = itemID2;
  
    }
  

  
    public int getNum() {
  
      return num;
  
    }
  

  
    public void setNum(int num) {
  
      this.num = num;
  
    }
  

  
}


页: [1]
查看完整版本: 用Hadoop构建电影推荐系统