设为首页 收藏本站
查看: 910|回复: 0

[经验分享] mahout推荐引擎使用hadoop(一) 生成偏好矩阵

[复制链接]

尚未签到

发表于 2016-12-13 08:08:55 | 显示全部楼层 |阅读模式
   第一个步骤就是生成偏好矩阵,这个工作是在PreparePreferenceMatrixJob中完成的。

下面具体的分析一下这个类。

if (shouldRunNextPhase(parsedArgs, currentPhase)) {
ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
"--input", getInputPath().toString(),//输入路径
"--output", prepPath.toString(),//输出路径
"--maxPrefsPerUser", String.valueOf(maxPrefsPerUser),
"--minPrefsPerUser", String.valueOf(minPrefsPerUser),
"--booleanData", String.valueOf(booleanData),//是否为boolean型的偏好数据
"--tempDir", getTempPath().toString() });
}

  下边详细的分析一下PreparePreferenceMatrixJob类的实现,主要是通过三个MapReduce来完成的:

public class PreparePreferenceMatrixJob extends AbstractJob {
@Override
public int run(String[] args) throws Exception {
//第一个MapReduce
Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class,
ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class,
VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
//第二个MapReduce
Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class,
ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);
//第三个MapReduce
Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX),
ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class,
IntWritable.class, VectorWritable.class);
toItemVectors.setCombinerClass(ToItemVectorsReducer.class);
}
  首先说一下prepareJob()函数,他的参数列表
    (输入路径, 输出路径, Mapper类,Mapper的key,Mapper的value,reducer类,reducer的key,reducer的value)
  至于这函数里边的实现就不用看了,反正就是执行了一个MapReducer操作。
  下边挨个分析这三个MapReducer的具体计算过程:
  (1) MapperItemIDIndexMapper类,将类型为Long的itemid转换成int类型的itemid_index,并输出<itemid_index, itemid>这样将所有的item先进行split

public final class ItemIDIndexMapper extends
Mapper<LongWritable,Text, VarIntWritable, VarLongWritable>{
@Override
protected void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
//将long类型itemid转换成int类型的itemid_index
int index = TasteHadoopUtils.idToIndex(itemID);
//输出(itemid_index, itemid)类型
context.write(new VarIntWritable(index), new VarLongWritable(itemID));
}  
}
  Reducer : temIDIndexReducer类,对每一个itemid_index下的所有的itemid取最小值,并输出<itemid_index, minimumItemid>

public final class ItemIDIndexReducer extends
Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {
//对每一个itemid_index下的所有的itemid取最小值,并输出<itemid_index, minimumItemid>
@Override
protected void reduce(VarIntWritable index,
Iterable<VarLongWritable> possibleItemIDs,
Context context) throws IOException, InterruptedException {
long minimumItemID = Long.MAX_VALUE;
for (VarLongWritable varLongWritable : possibleItemIDs) {
long itemID = varLongWritable.get();
if (itemID < minimumItemID) {
minimumItemID = itemID;
}
}
if (minimumItemID != Long.MAX_VALUE) {
context.write(index, new VarLongWritable(minimumItemID));
}
}
}
  (2)Mapper :  ToItemPrefsMapper类,继承于ToEntityPrefsMapper类,作用是从文件中读取数据,并以
  userid,<itemid, pref>  的形式作为reducer的输出

public final class ToItemPrefsMapper extends ToEntityPrefsMapper {
public ToItemPrefsMapper() {
super(false);
}  
}
public abstract class ToEntityPrefsMapper extends
Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {
@Override
public void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
String[] tokens = DELIMITER.split(value.toString());
long userID = Long.parseLong(tokens[0]);
long itemID = Long.parseLong(tokens[1]);
if (itemKey ^ transpose) {
// If using items as keys, and not transposing items and users, then users are items!
// Or if not using items as keys (users are, as usual), but transposing items and users,
// then users are items! Confused?
//如果设置偏转,那么就是将userid和itemid的位置互换,也就是基于user的CF了
long temp = userID;
userID = itemID;
itemID = temp;
}
if (booleanData) {
context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
} else {
float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) + ratingShift : 1.0f;
//输出(userid,<itemid, pref> )类型
context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
}
}
}
   Reducer : ToUserVectorsReducer类,收集同一个userid下的<itemid, pref>对,并将itemid映射成itemid_index,然后和pref组成<itemid_index, pref>对,并将同一个userid下的所有<itemid_index, pref>对保存到一个vecotr(RandomAccessSparseVector类型的)中,并输出 (userid,vector<itemid_index, pref>) 类型的结果


public final class ToUserVectorsReducer extends
Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
@Override
protected void reduce(VarLongWritable userID,
Iterable<VarLongWritable> itemPrefs,
Context context) throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
//将long类型的itemid映射成int类型的itemid_index
int index = TasteHadoopUtils.idToIndex(itemPref.get());
float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
userVector.set(index, value);
}
if (userVector.getNumNondefaultElements() >= minPreferences) {
VectorWritable vw = new VectorWritable(userVector);
vw.setWritesLaxPrecision(true);
context.getCounter(Counters.USERS).increment(1);
context.write(userID, vw);
}
}
}
  (3)Mapper :ToItemVectorsMapper类,将上边(2)中的输出作为Mapper的输入,将Long类型userid映射为int类型的userid_index,并输出( itemid_index, <userid_index, pref> )类型

public class ToItemVectorsMapper
extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)
throws IOException, InterruptedException {
Vector userRatings = vectorWritable.get();
int numElementsBeforeSampling = userRatings.getNumNondefaultElements();
userRatings = Vectors.maybeSample(userRatings, sampleSize);
int numElementsAfterSampling = userRatings.getNumNondefaultElements();
int column = TasteHadoopUtils.idToIndex(rowIndex.get());
VectorWritable itemVector = new VectorWritable(new RandomAccessSparseVector(Integer.MAX_VALUE, 1));
itemVector.setWritesLaxPrecision(true);
Iterator<Vector.Element> iterator = userRatings.iterateNonZero();
while (iterator.hasNext()) {
Vector.Element elem = iterator.next();
//这里vector.setQuick()是将键值对插入到一个map里,因为column不变,所以每次替换value
itemVector.get().setQuick(column, elem.get());
//输出( itemid_index, <userid_index, pref> )
ctx.write(new IntWritable(elem.index()), itemVector);
}
ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling);
ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling - numElementsAfterSampling);
}
}
   Reducer :ToItemVectorsReducer类,将同一个itemid_index下的<userid_index, pref>对收集到一起,保存在一个vector中,并输出 ( itemid_index,vector<userid_index, pref> ) 类型

public class ToItemVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> vectors, Context ctx)
throws IOException, InterruptedException {
//合并同一个itemid_index下的<userid_index, pref>对
VectorWritable vectorWritable = VectorWritable.merge(vectors.iterator());
vectorWritable.setWritesLaxPrecision(true);
ctx.write(row, vectorWritable);
}
}
  至此,在PreparePreferenceMatrixJob中的所有MapReduce计算都完成了,最终我们看到,生成的是一个
  itemid_index,  vector<userid_index, pref>   的偏好矩阵

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313432-1-1.html 上篇帖子: Hadoop白皮书(2):分布式数据库HBase简介 下篇帖子: mahout推荐引擎使用hadoop(二) 计算协同矩阵
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表