设为首页 收藏本站
查看: 4139|回复: 0

[经验分享] (转)mahout推荐引擎使用hadoop

[复制链接]

尚未签到

发表于 2015-7-13 09:42:32 | 显示全部楼层 |阅读模式
  Taste 是 Apache Mahout 提供的一个协同过滤算法的高效实现,它是一个基于Java实现的可扩展的高效的推荐引擎。扩展性是指使用hadoop进行mapreduce计算,提高运算性能。
  最近开始看源码,分析一下,做个笔记。 ItemSimilarityJob类是mahout使用hadoop做推荐引擎的主要实现类,下面开始分析。
  run()函数是启动函数:



public final class RecommenderJob extends AbstractJob {
public static final String BOOLEAN_DATA = "booleanData";
private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
@Override
public int run(String[] args) throws Exception {
//这里原来有大一堆代码,都是用来载入配置项,不用管它
//第一步:准备矩阵,将原始数据转换为一个矩阵,在PreparePreferenceMatrixJob这个类中完成
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
"--input", getInputPath().toString(),
"--output", prepPath.toString(),
"--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
"--minPrefsPerUser", String.valueOf(minPrefsPerUser),
"--booleanData", String.valueOf(booleanData),
"--tempDir", getTempPath().toString()});
numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
}
//第二步:计算协同矩阵
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
/* special behavior if phase 1 is skipped */
if (numberOfUsers == -1) {
numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
PathType.LIST, null, getConf());
}
/* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
* new DistributedRowMatrix(...).rowSimilarity(...) */
//calculate the co-occurrence matrix
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
"--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
"--tempDir", getTempPath().toString()});
}
//start the multiplication of the co-occurrence matrix by the user vectors
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job prePartialMultiply1 = prepareJob(
similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = prePartialMultiply1.waitForCompletion(true);
if (!succeeded)
return -1;
//continue the multiplication
Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
if (usersFile != null) {
prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
}
prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
maxPrefsPerUser);
succeeded = prePartialMultiply2.waitForCompletion(true);
if (!succeeded)
return -1;
//finish the job
Job partialMultiply = prepareJob(
new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
succeeded = partialMultiply.waitForCompletion(true);
if (!succeeded)
return -1;
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
//filter out any users we don't care about
/* convert the user/item pairs to filter if a filterfile has been specified */
if (filterFile != null) {
Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = itemFiltering.waitForCompletion(true);
if (!succeeded)
return -1;
}
String aggregateAndRecommendInput = partialMultiplyPath.toString();
if (filterFile != null) {
aggregateAndRecommendInput += "," + explicitFilterPath;
}
//extract out the recommendations
Job aggregateAndRecommend = prepareJob(
new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
if (itemsFile != null) {
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
}
if (filterFile != null) {
setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);
}
setIOSort(aggregateAndRecommend);
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
boolean succeeded = aggregateAndRecommend.waitForCompletion(true);
if (!succeeded)
return -1;
}
return 0;
}

  详细内容请看 http://eric-gcm.iteye.com/blog/1816547

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86135-1-1.html 上篇帖子: Hadoop Streaming框架使用(二) 下篇帖子: Hadoop中RPC机制
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表