设为首页 收藏本站
查看: 754|回复: 0

[经验分享] 基于hadoop的推荐算法-mahout版

[复制链接]
发表于 2016-12-11 09:09:58 | 显示全部楼层 |阅读模式
基于hadoop的推荐算法,讲其中mahout实现的基于项目的推荐算法
分为4步:
1.获得人-物 用户矩阵
输入为所有人对物品的评价或关联
map端输出key为人,value为物品+倾好度
reeduce端输出key为人,vallue为多个物品+倾好度

2.获得物-物 项目矩阵
输入为“用户矩阵”,讲每一行人-物数据中的物品做笛卡尔积,生产成物-物的关联
map端输出为key为物,value为关联度
reduce端输出key为物,value为多个物的关联度
(可以根据各种规则生成项目相似度矩阵表,此处算法带过)
修改:
求项目相似矩阵是基于项目的协同过滤算法的核心
  公式有很多种,核心是物品i和物品j相关用户的交集与并集的商
  mahout使用的公式是1.dot(i,j) = sum(Pi(u)*Pi(u))
  2.norms(i) = sum(Pi(u)^2)
  3.simi(i,j) = 1/(1+(norms(i)-2*dot(i,j)+noorm(i))^1/2)
  
  mahout的实现方法是
  第一个job,用物品-人的矩阵,求得norms,即物品的用户平方和,输出是物-norms
  第二个job,Map:用人-物的矩阵,求Pi(u)*Pi(u),即相同用户的物品的评价的乘机,输出物-多个对端物品的Pi(u)*Pi(u)
  Reduce:用物-多个对端物品的Pi(u)*Pi(u)和物-norms,求得物品的相似矩阵(因为这个时候可以汇总所有和这个物品相关的物品的dot)
  第三个job,补全物品的相似矩阵

3.获得用户-项目相似矩阵
输入为人-物 用户矩阵 和 物-物 项目矩阵
Map端输出key为物,value为类VectorOrPrefWritable,是包含物与人的倾好度,或是物与物的相似度
reduce端输出key为物,value为类VectorAndPrefWritable,是汇总当个物品到所有人的倾好度和到所有物品的相似度

4.获得用户推荐矩阵
输入为VectorAndPrefWritable
Map端输出为key:人,value:物+系数(map端根据单个物品贡献的系数生成推荐系数,也就是人到物品A的倾好度*物品A到其他物品的相似度)
reduce端输出为key:人,,value:推荐项目+系数(reduce端使用自定公式,汇总所有单物品贡献的四叔,求人到其他项目的倾好度,取topn作为当前用户的推荐项目)
 
再在这里贴几个mahout推荐算法分析的帖子:
http://eric-gcm.iyunv.com/blog/1817822
http://eric-gcm.iyunv.com/blog/1818033
http://eric-gcm.iyunv.com/blog/1820060
 
以下是mahout代码:
 
 ItemSimilarityJob类是mahout使用hadoop做推荐引擎的主要实现类,下面开始分析。
run()函数是启动函数:
 
 
Java代码   DSC0000.png


  • public final class RecommenderJob extends AbstractJob {  
  •   
  •   public static final String BOOLEAN_DATA = "booleanData";  
  •   
  •   private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;  
  •   private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;  
  •   private static final int DEFAULT_MIN_PREFS_PER_USER = 1;  
  •   
  •   @Override  
  •   public int run(String[] args) throws Exception {  
  •     //这里原来有大一堆代码,都是用来载入配置项,不用管它  
  •   
  •     //第一步:准备矩阵,将原始数据转换为一个矩阵,在PreparePreferenceMatrixJob这个类中完成  
  •     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  •       ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{  
  •               "--input", getInputPath().toString(),  
  •               "--output", prepPath.toString(),  
  •               "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),  
  •               "--minPrefsPerUser", String.valueOf(minPrefsPerUser),  
  •               "--booleanData", String.valueOf(booleanData),  
  •               "--tempDir", getTempPath().toString()});  
  •   
  •       numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());  
  •     }  
  •   
  •     //第二步:计算协同矩阵  
  •     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  •   
  •       /* special behavior if phase 1 is skipped */  
  •       if (numberOfUsers == -1) {  
  •         numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  •                 PathType.LIST, null, getConf());  
  •       }  
  •   
  •       /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like 
  •        * new DistributedRowMatrix(...).rowSimilarity(...) */  
  •       //calculate the co-occurrence matrix  
  •       ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{  
  •               "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),  
  •               "--output", similarityMatrixPath.toString(),  
  •               "--numberOfColumns", String.valueOf(numberOfUsers),  
  •               "--similarityClassname", similarityClassname,  
  •               "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),  
  •               "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),  
  •               "--threshold", String.valueOf(threshold),  
  •               "--tempDir", getTempPath().toString()});  
  •     }  
  •   
  •     //start the multiplication of the co-occurrence matrix by the user vectors  
  •     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  •       Job prePartialMultiply1 = prepareJob(  
  •               similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,  
  •               SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  •               Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  •               SequenceFileOutputFormat.class);  
  •       boolean succeeded = prePartialMultiply1.waitForCompletion(true);  
  •       if (!succeeded)   
  •         return -1;  
  •       //continue the multiplication  
  •       Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  •               prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,  
  •               VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  •               SequenceFileOutputFormat.class);  
  •       if (usersFile != null) {  
  •         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);  
  •       }  
  •       prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,  
  •               maxPrefsPerUser);  
  •       succeeded = prePartialMultiply2.waitForCompletion(true);  
  •       if (!succeeded)   
  •         return -1;  
  •       //finish the job  
  •       Job partialMultiply = prepareJob(  
  •               new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,  
  •               SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  •               ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  •               SequenceFileOutputFormat.class);  
  •       setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);  
  •       succeeded = partialMultiply.waitForCompletion(true);  
  •       if (!succeeded)   
  •         return -1;  
  •     }  
  •   
  •     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  •       //filter out any users we don't care about  
  •       /* convert the user/item pairs to filter if a filterfile has been specified */  
  •       if (filterFile != null) {  
  •         Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,  
  •                 ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,  
  •                 ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  •                 SequenceFileOutputFormat.class);  
  •         boolean succeeded = itemFiltering.waitForCompletion(true);  
  •         if (!succeeded)   
  •           return -1;  
  •       }  
  •   
  •       String aggregateAndRecommendInput = partialMultiplyPath.toString();  
  •       if (filterFile != null) {  
  •         aggregateAndRecommendInput += "," + explicitFilterPath;  
  •       }  
  •       //extract out the recommendations  
  •       Job aggregateAndRecommend = prepareJob(  
  •               new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,  
  •               PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,  
  •               AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,  
  •               TextOutputFormat.class);  
  •       Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();  
  •       if (itemsFile != null) {  
  •         aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);  
  •       }  
  •   
  •       if (filterFile != null) {  
  •         setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);  
  •       }  
  •       setIOSort(aggregateAndRecommend);  
  •       aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,  
  •               new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());  
  •       aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);  
  •       aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);  
  •       boolean succeeded = aggregateAndRecommend.waitForCompletion(true);  
  •       if (!succeeded)   
  •         return -1;  
  •     }  
  •   
  •     return 0;  
  •   }  

 
 
 
       第二步,计算协同矩阵,主要在RowSimilarityJob 这个类中完成
 
  
Java代码  


  • ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{  
  •               "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),  
  •               "--output", similarityMatrixPath.toString(),  
  •               "--numberOfColumns", String.valueOf(numberOfUsers),  
  •               "--similarityClassname", similarityClassname,  
  •               "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),  
  •               "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),  
  •               "--threshold", String.valueOf(threshold),  
  •               "--tempDir", getTempPath().toString()});  
  •     }  

   可以看到这个job的输入路径就是上一篇中,PreparePreferenceMatrixJob中最后一个reducer的输出路径。
 
下边详细分析RowSimilarityJob类的实现:
 
Java代码  


  • public class RowSimilarityJob extends AbstractJob {  
  •   
  •   
  •   @Override  
  •   public int run(String[] args) throws Exception {  
  •     //一大堆载入参数的代码,忽略  
  •       
  •     //第一个MapReduce  
  •     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  •       Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,  
  •           VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);  
  •       normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);  
  •       Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();  
  •       normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));  
  •       normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());  
  •       normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());  
  •       normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());  
  •       normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);  
  •       boolean succeeded = normsAndTranspose.waitForCompletion(true);  
  •       if (!succeeded) {  
  •         return -1;  
  •       }  
  •     }  
  •     //第二个MapReduce  
  •     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  •       Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,  
  •           IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);  
  •       pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);  
  •       Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();  
  •       pairwiseConf.set(THRESHOLD, String.valueOf(threshold));  
  •       pairwiseConf.set(NORMS_PATH, normsPath.toString());  
  •       pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());  
  •       pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());  
  •       pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);  
  •       pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);  
  •       pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);  
  •       boolean succeeded = pairwiseSimilarity.waitForCompletion(true);  
  •       if (!succeeded) {  
  •         return -1;  
  •       }  
  •     }  
  •     //第三个MapReduce  
  •     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  •       Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,  
  •           IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,  
  •           VectorWritable.class);  
  •       asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);  
  •       asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);  
  •       boolean succeeded = asMatrix.waitForCompletion(true);  
  •       if (!succeeded) {  
  •         return -1;  
  •       }  
  •     }  
  •   
  •     return 0;  
  •   }  

 
 
 可以看到RowSimilityJob也是分成三个MapReduce过程:
 
1、Mapper :VectorNormMapper类,输出 ( userid_index, <itemid_index, pref> )类型
Java代码  


  • public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  •   
  •     @Override  
  •     protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)  
  •         throws IOException, InterruptedException {  
  •   
  •       Vector rowVector = similarity.normalize(vectorWritable.get());  
  •   
  •       int numNonZeroEntries = 0;  
  •       double maxValue = Double.MIN_VALUE;  
  •   
  •       Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();  
  •       while (nonZeroElements.hasNext()) {  
  •         Vector.Element element = nonZeroElements.next();  
  •         RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);  
  •         partialColumnVector.setQuick(row.get(), element.get());  
  •         //输出 ( userid_index, <itemid_index, pref> )类型  
  •         ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));  
  •   
  •         numNonZeroEntries++;  
  •         if (maxValue < element.get()) {  
  •           maxValue = element.get();  
  •         }  
  •       }  
  •   
  •       if (threshold != NO_THRESHOLD) {  
  •         nonZeroEntries.setQuick(row.get(), numNonZeroEntries);  
  •         maxValues.setQuick(row.get(), maxValue);  
  •       }  
  •       norms.setQuick(row.get(), similarity.norm(rowVector));  
  •       //计算item的总数  
  •       ctx.getCounter(Counters.ROWS).increment(1);  
  •     }  
  • }  

 
Reduer : MergeVectorsReducer类,输入的是(userid_index, <itemid_index, pref>),同一个userid_index在此进行合并,输出( userid_index, vector<itemid_index, pref> )
Java代码  


  •   public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  •   
  •     @Override  
  •     protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)  
  •         throws IOException, InterruptedException {  
  •       Vector partialVector = Vectors.merge(partialVectors);  
  •   
  •       if (row.get() == NORM_VECTOR_MARKER) {  
  •         Vectors.write(partialVector, normsPath, ctx.getConfiguration());  
  •       } else if (row.get() == MAXVALUE_VECTOR_MARKER) {  
  •         Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());  
  •       } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {  
  •         Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);  
  •       } else {  
  •         ctx.write(row, new VectorWritable(partialVector));  
  •       }  
  •     }  
  •   }  
  • }  

 2、Mapper:CooccurrencesMapper类,对同一个userid_index下的vector<itemid_index ,pref>进行处理,
收集<item1, item2>对, 输出为( itemid_index, vector<itemid_index, value> )
Java代码  


  • public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  •   
  •     @Override  
  •     protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)  
  •         throws IOException, InterruptedException {  
  •       Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);  
  •       Arrays.sort(occurrences, BY_INDEX);  
  •   
  •       int cooccurrences = 0;  
  •       int prunedCooccurrences = 0;  
  •       for (int n = 0; n < occurrences.length; n++) {  
  •         Vector.Element occurrenceA = occurrences[n];  
  •         Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);  
  •         for (int m = n; m < occurrences.length; m++) {  
  •           Vector.Element occurrenceB = occurrences[m];  
  •           if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {  
  •             dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));  
  •             cooccurrences++;  
  •           } else {  
  •             prunedCooccurrences++;  
  •           }  
  •         }  
  •         ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));  
  •       }  
  •       ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);  
  •       ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);  
  •     }  
  •   }  

 
Reducer :SimilarityReducer类,生成协同矩阵
 
Java代码  


  • public static class SimilarityReducer  
  •       extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  •   
  •     @Override  
  •     protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)  
  •         throws IOException, InterruptedException {  
  •       Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();  
  •       //取一个vecotr作为该item的行向量  
  •       Vector dots = partialDotsIterator.next().get();  
  •       while (partialDotsIterator.hasNext()) {  
  •         Vector toAdd = partialDotsIterator.next().get();  
  •         Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();  
  •         while (nonZeroElements.hasNext()) {  
  •           Vector.Element nonZeroElement = nonZeroElements.next();  
  •           //nonZeroElement.index()为itemid,将另一个vecotr中itemid的value加进去  
  •           dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());  
  •         }  
  •       }  
  •       //最后得到的dots是协同矩阵中行号为row的一行,行中元素是item对其他的item的相似度  
  •       Vector similarities = dots.like();  
  •       double normA = norms.getQuick(row.get());  
  •       Iterator<Vector.Element> dotsWith = dots.iterateNonZero();  
  •       while (dotsWith.hasNext()) {  
  •         Vector.Element b = dotsWith.next();  
  •         double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);  
  •         if (similarityValue >= treshold) {  
  •           similarities.set(b.index(), similarityValue);  
  •         }  
  •       }  
  •       if (excludeSelfSimilarity) {  
  •         similarities.setQuick(row.get(), 0);  
  •       }  
  •       ctx.write(row, new VectorWritable(similarities));  
  •     }  
  •   }  

 
3、Mapper:UnsymmetrifyMapper类,用来生成对称矩阵的。上一步得到的是非对称矩阵,首先将矩阵偏转,得到偏转矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵
 
Java代码  


  • public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>  {  
  •   
  •     private int maxSimilaritiesPerRow;  
  •   
  •     @Override  
  •     protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {  
  •       maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);  
  •       Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");  
  •     }  
  •   
  •     @Override  
  •     protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)  
  •         throws IOException, InterruptedException {  
  •       Vector similarities = similaritiesWritable.get();  
  •       // For performance reasons moved transposedPartial creation out of the while loop and reusing the same vector  
  •       Vector transposedPartial = similarities.like();  
  •       TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);  
  •       Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();  
  •       //这个地方用来生成偏转矩阵的,非对称矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵  
  •       while (nonZeroElements.hasNext()) {  
  •         Vector.Element nonZeroElement = nonZeroElements.next();  
  •         topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));  
  •           
  •         transposedPartial.setQuick(row.get(), nonZeroElement.get());  
  •         //偏转矩阵中的每一个元素  
  •         ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));  
  •           
  •         transposedPartial.setQuick(row.get(), 0.0);  
  •       }  
  •       Vector topKSimilarities = similarities.like();  
  •       for (Vector.Element topKSimilarity : topKQueue.retrieve()) {  
  •         topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());  
  •       }  
  •       //这里只收集前maxSimilaritiesPerRow个得分最高的item,所以咱们最后的对称矩阵,实际上每行只有  
  •       //maxSimilaritiesPerRow个是对称的,其他的位置也不管了  
  •       ctx.write(row, new VectorWritable(topKSimilarities));  
  •     }  
  •   }  

 
 Reducer:MergeToTopKSimilaritiesReducer类,就是将上面Map偏转的元素都收集起来,也就是完成了偏转矩阵和(截取了得分前maxSimilaritiesPerRow个)的原矩阵相加的过程,得到了对称矩阵
Java代码  


  • public static class MergeToTopKSimilaritiesReducer  
  •     extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  •   
  •   private int maxSimilaritiesPerRow;  
  •   
  •   @Override  
  •   protected void setup(Context ctx) throws IOException, InterruptedException {  
  •     maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);  
  •     Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");  
  •   }  
  •   
  •   @Override  
  •   protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)  
  •       throws IOException, InterruptedException {  
  •     Vector allSimilarities = Vectors.merge(partials);  
  •     Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);  
  •     ctx.write(row, new VectorWritable(topKSimilarities));  
  •   }  
  • }  

 
至此,RowSimilarityJob类的全部工作就完成,最终生成的是一个对称矩阵,也就是协同矩阵
 
 
 
Java代码  


  • //协同矩阵与用户向量相乘  
  •     //start the multiplication of the co-occurrence matrix by the user vectors  
  •     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  •       //第一个MapReducer  
  •       Job prePartialMultiply1 = prepareJob(  
  •               similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,  
  •               SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  •               Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  •               SequenceFileOutputFormat.class);  
  •       boolean succeeded = prePartialMultiply1.waitForCompletion(true);  
  •       if (!succeeded)   
  •         return -1;  
  •       //第二个MapReduce  
  •       //continue the multiplication  
  •       Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  •               prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,  
  •               VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  •               SequenceFileOutputFormat.class);  
  •       if (usersFile != null) {  
  •         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);  
  •       }  
  •       prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,  
  •               maxPrefsPerUser);  
  •       succeeded = prePartialMultiply2.waitForCompletion(true);  
  •       if (!succeeded)   
  •         return -1;  
  •       //finish the job  
  •       //第三个MapReduce  
  •       Job partialMultiply = prepareJob(  
  •               new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,  
  •               SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  •               ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  •               SequenceFileOutputFormat.class);  
  •       setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);  
  •       succeeded = partialMultiply.waitForCompletion(true);  
  •       if (!succeeded)   
  •         return -1;  
  •     }  

 
 下边也是同样分析一下这个三个MapReduce的细节:
 
1、Mapper: SimilarityMatrixRowWrapperMapper 类,将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper 的输出类型一致
Java代码  


  • public final class SimilarityMatrixRowWrapperMapper extends  
  •     Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {  
  •     
  •   //将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper  
  •   //的输出类型一致  
  •   @Override  
  •   protected void map(IntWritable key,  
  •                      VectorWritable value,  
  •                      Context context) throws IOException, InterruptedException {  
  •     Vector similarityMatrixRow = value.get();  
  •     /* remove self similarity */  
  •     similarityMatrixRow.set(key.get(), Double.NaN);  
  •     context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));  
  •   }  
  •   
  • }  

 
 
2、Mapper:UserVectorSplitterMapper类
Java代码  


  • //输入格式: theUserID:<itemid_index1,pref1>,<itemid_index2,pref2>........<itemid_indexN,prefN>  
  •   //输出格式:  itemid1:<theUserID,pref1>  
  •   //          itemid2:<theUserID,pref2>  
  •   //          itemid3:<theUserID,pref3>  
  •   //          ......  
  •   //          itemidN:<theUserID,prefN>  

Java代码  


  • public final class UserVectorSplitterMapper extends  
  •     Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {  
  •     
  •   @Override  
  •   protected void map(VarLongWritable key,  
  •                      VectorWritable value,  
  •                      Context context) throws IOException, InterruptedException {  
  •     long userID = key.get();  
  •     if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {  
  •       return;  
  •     }  
  •     Vector userVector = maybePruneUserVector(value.get());  
  •     Iterator<Vector.Element> it = userVector.iterateNonZero();  
  •     VarIntWritable itemIndexWritable = new VarIntWritable();  
  •     VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();  
  •     while (it.hasNext()) {  
  •       Vector.Element e = it.next();  
  •       itemIndexWritable.set(e.index());  
  •       vectorOrPref.set(userID, (float) e.get());  
  •       context.write(itemIndexWritable, vectorOrPref);  
  •     }  
  •   }  

 
3、Reduce:ToVectorAndPrefReducer类,收集协同矩阵为itemid的一行,并且收集评价过该item的用户和评分,最后的输出是 itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)
 
Java代码  


  • public final class ToVectorAndPrefReducer extends  
  •     Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {  
  •   
  •   //收集所有key为itemid的  
  •   @Override  
  •   protected void reduce(VarIntWritable key,  
  •                         Iterable<VectorOrPrefWritable> values,  
  •                         Context context) throws IOException, InterruptedException {  
  •   
  •     List<Long> userIDs = Lists.newArrayList();  
  •     List<Float> prefValues = Lists.newArrayList();  
  •     Vector similarityMatrixColumn = null;  
  •     for (VectorOrPrefWritable value : values) {  
  •       if (value.getVector() == null) {  
  •         // Then this is a user-pref value  
  •         userIDs.add(value.getUserID());  
  •         prefValues.add(value.getValue());  
  •       } else {  
  •         // Then this is the column vector  
  •         //协同矩阵的一个行(行号为itemid的一行)  
  •         if (similarityMatrixColumn != null) {  
  •           throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());  
  •         }  
  •         similarityMatrixColumn = value.getVector();  
  •       }  
  •     }  
  •   
  •     if (similarityMatrixColumn == null) {  
  •       return;  
  •     }  
  •     //收集协同矩阵为itemid的一行,并且手机评价过该item的用户和评分  
  •     VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);  
  •     context.write(key, vectorAndPrefs);  
  •   }  
  •   
  • }  

 
第四步,协同矩阵和用户向量相乘,得到推荐结果
 
Java代码  


  • //extract out the recommendations  
  •      Job aggregateAndRecommend = prepareJob(  
  •              new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,  
  •              PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,  
  •              AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,  
  •              TextOutputFormat.class);  
  •      Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();  

 
 
Mapper:PartialMultiplyMapper类
 
Java代码  


  • //输入类型:( itemid_index, <userid的数组,pref的数组,协同矩阵行号为itemid_index的行> )  
  • //输出类型: userid,<该用户对itemid_index1的评分,协同矩阵行号为itemid_index1的行> )  
  • //        userid,<该用户对itemid_index2的评分,协同矩阵行号为itemid_index2的行> )  
  • //                       .....    
  • //                       .....  
  • //          userid,<该用户对itemid_indexN的评分,协同矩阵行号为itemid_indexN的行> )  

 
 
 
Java代码  


  • public final class PartialMultiplyMapper extends  
  •     Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {  
  •   
  •   @Override  
  •   protected void map(VarIntWritable key,  
  •                      VectorAndPrefsWritable vectorAndPrefsWritable,  
  •                      Context context) throws IOException, InterruptedException {  
  •   
  •     Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();  
  •     List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();  
  •     List<Float> prefValues = vectorAndPrefsWritable.getValues();  
  •   
  •     VarLongWritable userIDWritable = new VarLongWritable();  
  •     PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();  
  •   
  •     for (int i = 0; i < userIDs.size(); i++) {  
  •       long userID = userIDs.get(i);  
  •       float prefValue = prefValues.get(i);  
  •       if (!Float.isNaN(prefValue)) {  
  •         prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);  
  •         userIDWritable.set(userID);  
  •         context.write(userIDWritable, prefAndSimilarityColumn);  
  •       }  
  •     }  
  •   }  
  •   
  • }  

 
 Reducer:AggregateAndRecommendReducer类,Reducer中进行PartialMultiply,按乘积得到的推荐度的大小取出最大的几个item。对于非booleanData,是用pref和相似度矩阵的PartialMultiply得到推荐度的值来进行排序。
而booleanData的pref值都是1.0f,所以去计算矩阵相乘的过程没有意义,直接累加相似度的值即可。
用这个数据排序就可得到推荐结果
 
Java代码  


  • public final class AggregateAndRecommendReducer extends  
  •     Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {  
  •  @Override  
  •   protected void reduce(VarLongWritable userID,  
  •                         Iterable<PrefAndSimilarityColumnWritable> values,  
  •                         Context context) throws IOException, InterruptedException {  
  •     if (booleanData) {  
  •       reduceBooleanData(userID, values, context);  
  •     } else {  
  •       reduceNonBooleanData(userID, values, context);  
  •     }  
  •   }  
  •   
  •   private void reduceBooleanData(VarLongWritable userID,  
  •                                  Iterable<PrefAndSimilarityColumnWritable> values,  
  •                                  Context context) throws IOException, InterruptedException {  
  •     /* having boolean data, each estimated preference can only be 1, 
  •      * however we can't use this to rank the recommended items, 
  •      * so we use the sum of similarities for that. */  
  •     Vector predictionVector = null;  
  •     for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {  
  •       predictionVector = predictionVector == null  
  •           ? prefAndSimilarityColumn.getSimilarityColumn()  
  •           : predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());  
  •     }  
  •     writeRecommendedItems(userID, predictionVector, context);  
  •   }  
  •   
  •   private void reduceNonBooleanData(VarLongWritable userID,  
  •                         Iterable<PrefAndSimilarityColumnWritable> values,  
  •                         Context context) throws IOException, InterruptedException {  
  •     /* each entry here is the sum in the numerator of the prediction formula */  
  •     Vector numerators = null;  
  •     /* each entry here is the sum in the denominator of the prediction formula */  
  •     Vector denominators = null;  
  •     /* each entry here is the number of similar items used in the prediction formula */  
  •     Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);  
  •   
  •     for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {  
  •       Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();  
  •       float prefValue = prefAndSimilarityColumn.getPrefValue();  
  •       /* count the number of items used for each prediction */  
  •       Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();  
  •       while (usedItemsIterator.hasNext()) {  
  •         int itemIDIndex = usedItemsIterator.next().index();  
  •         numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);  
  •       }  
  •       //vector.times(float) 是向量乘于一个数,也就是向量的每一个值都乘以这个数  
  •       //vector.plus(vector) 是两个向量相加,每一个位置上的值相加  
  •         
  •       //numerators是一个vecotr,每一个元素是这样的  
  •       /* 
  •                 例如index为item1的元素的值为: 
  •        simility(item1, item_2)*pref(userid, item_2) 
  •       + simility(item_1, item_3)*pref(userid, item_3) 
  •       + simility(item1, item_4)*pref(userid, item_4) 
  •       + ……  
  •       + simility(item_1, item_2)*pref(userid, item_N) 
  •       */  
  •       // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度 ,pref(userid, item)代表用于userid对item打分分值   
  •        
  •       numerators = numerators == null  
  •           ? prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)  
  •           : numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn : simColumn.times(prefValue));  
  •         
  •         
  •         
  •       simColumn.assign(ABSOLUTE_VALUES);  
  •       //denominators是一个vecotr,每一个元素是这样的  
  •       /* 
  •                 例如index为item1的元素的值为: 
  •        simility(item1, item_2)+ simility(item_1, item_3)+ …… + simility(item_1, item_2)*pref(userid, item_N) 
  •       */  
  •       // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度  
  •       denominators = denominators == null ? simColumn : denominators.plus(simColumn);  
  •     }  
  •   
  •     if (numerators == null) {  
  •       return;  
  •     }  
  •   
  •     Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);  
  •     Iterator<Vector.Element> iterator = numerators.iterateNonZero();  
  •     while (iterator.hasNext()) {  
  •       Vector.Element element = iterator.next();  
  •       int itemIDIndex = element.index();  
  •       /* preference estimations must be based on at least 2 datapoints */  
  •       if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {  
  •         /* compute normalized prediction */  
  •         //计算归一化预测值  
  •         double prediction = element.get() / denominators.getQuick(itemIDIndex);  
  •         recommendationVector.setQuick(itemIDIndex, prediction);  
  •       }  
  •     }  
  •     writeRecommendedItems(userID, recommendationVector, context);  
  •   }  
  • }  

 
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312592-1-1.html 上篇帖子: Hadoop开发者1-4期打包整理下载,需要的赶紧(附内容说明,先看后下) 下篇帖子: 在eclipse下搭建hadoop开发环境各种问题的解决
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表