设为首页 收藏本站
查看: 691|回复: 0

[经验分享] mahout推荐引擎使用hadoop(二) 计算协同矩阵

[复制链接]

尚未签到

发表于 2016-12-13 08:10:01 | 显示全部楼层 |阅读模式
  第二步,计算协同矩阵,主要在RowSimilarityJob 这个类中完成

ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
"--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
"--tempDir", getTempPath().toString()});
}
  可以看到这个job的输入路径就是上一篇中,PreparePreferenceMatrixJob中最后一个reducer的输出路径。
  下边详细分析RowSimilarityJob类的实现:

public class RowSimilarityJob extends AbstractJob {

@Override
public int run(String[] args) throws Exception {
//一大堆载入参数的代码,忽略
//第一个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
boolean succeeded = normsAndTranspose.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
//第二个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
pairwiseConf.set(NORMS_PATH, normsPath.toString());
pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
//第三个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
VectorWritable.class);
asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
boolean succeeded = asMatrix.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
return 0;
}
  可以看到RowSimilityJob也是分成三个MapReduce过程:
  1、Mapper :VectorNormMapper类,输出 ( userid_index, <itemid_index, pref> )类型

public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
throws IOException, InterruptedException {
Vector rowVector = similarity.normalize(vectorWritable.get());
int numNonZeroEntries = 0;
double maxValue = Double.MIN_VALUE;
Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
while (nonZeroElements.hasNext()) {
Vector.Element element = nonZeroElements.next();
RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
partialColumnVector.setQuick(row.get(), element.get());
//输出 ( userid_index, <itemid_index, pref> )类型
ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));
numNonZeroEntries++;
if (maxValue < element.get()) {
maxValue = element.get();
}
}
if (threshold != NO_THRESHOLD) {
nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
maxValues.setQuick(row.get(), maxValue);
}
norms.setQuick(row.get(), similarity.norm(rowVector));
//计算item的总数
ctx.getCounter(Counters.ROWS).increment(1);
}
}
  Reduer : MergeVectorsReducer类,输入的是(userid_index, <itemid_index, pref>),同一个userid_index在此进行合并,输出( userid_index, vector<itemid_index, pref> )

  public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
throws IOException, InterruptedException {
Vector partialVector = Vectors.merge(partialVectors);
if (row.get() == NORM_VECTOR_MARKER) {
Vectors.write(partialVector, normsPath, ctx.getConfiguration());
} else if (row.get() == MAXVALUE_VECTOR_MARKER) {
Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
} else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
} else {
ctx.write(row, new VectorWritable(partialVector));
}
}
}
}
   2、Mapper:CooccurrencesMapper类,对同一个userid_index下的vector<itemid_index ,pref>进行处理,
  收集<item1, item2>对, 输出为( itemid_index, vector<itemid_index, value> )

public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)
throws IOException, InterruptedException {
Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
Arrays.sort(occurrences, BY_INDEX);
int cooccurrences = 0;
int prunedCooccurrences = 0;
for (int n = 0; n < occurrences.length; n++) {
Vector.Element occurrenceA = occurrences[n];
Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
for (int m = n; m < occurrences.length; m++) {
Vector.Element occurrenceB = occurrences[m];
if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
cooccurrences++;
} else {
prunedCooccurrences++;
}
}
ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
}
ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
}
}
  Reducer :SimilarityReducer类,生成协同矩阵

public static class SimilarityReducer
extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
throws IOException, InterruptedException {
Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
//取一个vecotr作为该item的行向量
Vector dots = partialDotsIterator.next().get();
while (partialDotsIterator.hasNext()) {
Vector toAdd = partialDotsIterator.next().get();
Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
while (nonZeroElements.hasNext()) {
Vector.Element nonZeroElement = nonZeroElements.next();
//nonZeroElement.index()为itemid,将另一个vecotr中itemid的value加进去
dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
}
}
//最后得到的dots是协同矩阵中行号为row的一行,行中元素是item对其他的item的相似度
Vector similarities = dots.like();
double normA = norms.getQuick(row.get());
Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
while (dotsWith.hasNext()) {
Vector.Element b = dotsWith.next();
double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
if (similarityValue >= treshold) {
similarities.set(b.index(), similarityValue);
}
}
if (excludeSelfSimilarity) {
similarities.setQuick(row.get(), 0);
}
ctx.write(row, new VectorWritable(similarities));
}
}
  3、Mapper:UnsymmetrifyMapper类,用来生成对称矩阵的。上一步得到的是非对称矩阵,首先将矩阵偏转,得到偏转矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵

public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>  {
private int maxSimilaritiesPerRow;
@Override
protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {
maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
}
@Override
protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)
throws IOException, InterruptedException {
Vector similarities = similaritiesWritable.get();
// For performance reasons moved transposedPartial creation out of the while loop and reusing the same vector
Vector transposedPartial = similarities.like();
TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
//这个地方用来生成偏转矩阵的,非对称矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵
while (nonZeroElements.hasNext()) {
Vector.Element nonZeroElement = nonZeroElements.next();
topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
transposedPartial.setQuick(row.get(), nonZeroElement.get());
//偏转矩阵中的每一个元素
ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
transposedPartial.setQuick(row.get(), 0.0);
}
Vector topKSimilarities = similarities.like();
for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
}
//这里只收集前maxSimilaritiesPerRow个得分最高的item,所以咱们最后的对称矩阵,实际上每行只有
//maxSimilaritiesPerRow个是对称的,其他的位置也不管了
ctx.write(row, new VectorWritable(topKSimilarities));
}
}

   Reducer:MergeToTopKSimilaritiesReducer类,就是将上面Map偏转的元素都收集起来,也就是完成了偏转矩阵和(截取了得分前maxSimilaritiesPerRow个)的原矩阵相加的过程,得到了对称矩阵

  public static class MergeToTopKSimilaritiesReducer
extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
private int maxSimilaritiesPerRow;
@Override
protected void setup(Context ctx) throws IOException, InterruptedException {
maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
}
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
throws IOException, InterruptedException {
Vector allSimilarities = Vectors.merge(partials);
Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
ctx.write(row, new VectorWritable(topKSimilarities));
}
}
  至此,RowSimilarityJob类的全部工作就完成,最终生成的是一个对称矩阵,也就是协同矩阵

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313433-1-1.html 上篇帖子: mahout推荐引擎使用hadoop(一) 生成偏好矩阵 下篇帖子: [转]Yahoo持续的Pig/Hadoop(MapReduce)工作流
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表