设为首页 收藏本站
查看: 564|回复: 0

[经验分享] Hadoop 实现协同过滤 (example in chapter 6) Part 1

[复制链接]

尚未签到

发表于 2016-12-12 09:41:59 | 显示全部楼层 |阅读模式
  最近一直在研究《Mahout in Action》,今天才算是把第一部分看完。在Chapter 6中有一个例子,是实现协同过滤进行推荐的例子,不过书上的是针对布尔值的输入数据,在mahout的安装目录里面也有这个算法的详细源码,但是毕竟是源码,读起来有点晦涩,所以就参考了书上的例子编写了(书上的例子思路比较清楚)不仅仅是布尔值的输入数据的代码;
  下面就详细说下思路及代码:
  输入数据:

第一列代表用户名ID,后面是项目ID,用逗号分隔
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0


第一个MR 就是把输入数据的每个用户的信息整合下:  如下:

userid:1,vector:{103:2.5,102:3.0,101:5.0}
userid:2,vector:{104:2.0,103:5.0,102:2.5,101:2.0}
userid:3,vector:{107:5.0,105:4.5,104:4.0,101:2.5}
userid:4,vector:{106:4.0,104:4.5,103:3.0,101:5.0}
userid:5,vector:{106:4.0,105:3.5,104:4.0,103:2.0,102:3.0,101:4.0}
  

  全局变量的文件:
  WiKiUtils.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
public class WiKiUtils {
public static final String PATH="hdfs://fansyonepc:9000/user/fansy/date1012/wikifirst/";
public static int RECOMMENDATIONSPERUSER=5;
public static String JOB1OUTPATH=PATH+"job1/part-r-00000";  // this is used in WiKi5Reducer' function setup to get the items that the user already give a value
}

  WiKiDriver1.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;

public class WiKiDriver1 {
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
if (otherArgs.length != 2) {
System.err.println("Usage: WiKiDriver1 <in> <out>");
System.exit(2);
}
Job job1 = new Job(conf1, "wiki  job one");
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setNumReduceTasks(1);
job1.setJarByClass(WiKiDriver1.class);
job1.setMapperClass(WikiMapper1.class);
job1.setMapOutputKeyClass(VarLongWritable.class);
job1.setMapOutputValueClass(LongAndFloat.class);
job1.setReducerClass(WiKiReducer1.class);
job1.setOutputKeyClass(VarLongWritable.class);
job1.setOutputValueClass(VectorWritable.class);
FileInputFormat.addInputPath(job1, new Path("hdfs://fansyonepc:9000/user/fansy/input/"+otherArgs[0]));
SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
}
WiKiMapper1.java:package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import java.io.IOException;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
public class WikiMapper1 extends Mapper<LongWritable ,Text,VarLongWritable,LongAndFloat>{
//private static final Pattern NUMBERS=Pattern.compile("(\\d+)");
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
VarLongWritable userID=new VarLongWritable();
LongWritable itemID=new LongWritable();
FloatWritable itemValue=new FloatWritable();
String line=value.toString();
String[] info=line.split(",");
if(info.length!=3){
return;
}
userID.set(Long.parseLong(info[0]));
itemID.set(Long.parseLong(info[1]));
itemValue.set(Float.parseFloat(info[2]));
context.write(userID, new LongAndFloat(itemID,itemValue));
}
}
WiKiReducer1.java:package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class WiKiReducer1 extends Reducer<VarLongWritable,LongAndFloat,VarLongWritable,VectorWritable> {
public void reduce(VarLongWritable userID,Iterable<LongAndFloat> itemPrefs,Context context) throws IOException, InterruptedException{
// RandomAccessSparseVector(int cardinality, int initialCapacity)
Vector userVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10);
for(LongAndFloat itemPref:itemPrefs){
userVector.set(Integer.parseInt(itemPref.getFirst().toString()),Float.parseFloat(itemPref.getSecond().toString()) );
}
context.write(userID, new VectorWritable(userVector));
//System.out.println("userid:"+userID+",vector:"+userVector);
}
}
LongAndFloat.java: 用于存储数据并实现Writable的数据类型package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
public class LongAndFloat implements WritableComparable<LongAndFloat> {
private LongWritable first;
private FloatWritable second;
public LongAndFloat(){
set(new LongWritable(),new FloatWritable());
}
public LongAndFloat(LongWritable l,FloatWritable f){
set(l,f);
}
public  void set(LongWritable longWritable, FloatWritable intWritable) {
// TODO Auto-generated method stub
this.first=longWritable;
this.second=intWritable;
}
public LongWritable getFirst(){
return first;
}
public FloatWritable getSecond(){
return second;
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
first.readFields(arg0);
second.readFields(arg0);
}
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
first.write(arg0);
second.write(arg0);
}
@Override
public int compareTo(LongAndFloat o) {
// TODO Auto-generated method stub
int cmp=first.compareTo(o.first);
if(cmp!=0){
return cmp;
}
return second.compareTo(o.second);
}
}



  第二个MR:

输入数据为MR(1) 的输出            只是项目的相似度
先不管用户ID,直接对后面的所有项目进行拆分。
输出应该类似下面:101,{107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0}
102,{106:1.0,105:1.0,104:2.0,103:3.0,102:3.0,101:3.0}WiKiDriver2.java:package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.math.VectorWritable;

public class WiKiDriver2 {
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
if (otherArgs.length != 2) {
System.err.println("Usage: WiKiDriver2 <in> <out>");
System.exit(2);
}
Job job1 = new Job(conf1, "wiki  job two");
job1.setNumReduceTasks(1);
job1.setJarByClass(WiKiDriver2.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setMapperClass(WikiMapper2.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(IntWritable.class);
job1.setReducerClass(WiKiReducer2.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(VectorWritable.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0]));
SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
}
WiKiMapper2.java:package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class WikiMapper2 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,IntWritable>{
public void map(VarLongWritable userID,VectorWritable userVector,Context context) throws IOException, InterruptedException{
Iterator<Vector.Element> it=userVector.get().iterateNonZero();
while(it.hasNext()){
int index1=it.next().index();
//System.out.println("index1:"+index1);
Iterator<Vector.Element> it2=userVector.get().iterateNonZero();
while(it2.hasNext()){
int index2=it2.next().index();
//  test
/*if(index1==101){
System.out.println("index1:"+index1+",index2:"+index2);
}*/
context.write(new IntWritable(index1), new IntWritable(index2));
}
}
}
}
WiKiReducer2.javapackage org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class WiKiReducer2 extends Reducer<IntWritable,IntWritable,IntWritable,VectorWritable> {
public void reduce(IntWritable itemIndex1,Iterable<IntWritable> itemPrefs,Context context) throws IOException, InterruptedException{
// RandomAccessSparseVector(int cardinality, int initialCapacity)
Vector itemVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10);
for(IntWritable itemPref:itemPrefs){
int itemIndex2=itemPref.get();
itemVector.set(itemIndex2, itemVector.get(itemIndex2)+1.0);
}
context.write(itemIndex1, new VectorWritable(itemVector));
//System.out.println(itemIndex1+","+itemVector);
}
}
第三个MR:
含有两个Mapper,第一个MR(31)MR(2)的输出的格式转为VectorOrPrefWritable;

MR(32)针对MR(1)的输出把每一个项目ID和用户ID作为一对进行输出,输出格式也为VectorOrPrefWritable;
WiKiDriver31.java:package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;

public class WiKiDriver31 {
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
if (otherArgs.length != 2) {
System.err.println("Usage: WiKiDriver31 <in> <out>");
System.exit(2);
}
Job job1 = new Job(conf1, "wiki  job three1");
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setNumReduceTasks(1);
job1.setJarByClass(WiKiDriver31.class);
job1.setMapperClass(WikiMapper31.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(VectorOrPrefWritable.class);
// set a reducer only to use SequenceFileOutputFormat
job1.setReducerClass(WiKiReducer31.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(VectorOrPrefWritable.class);
// this MR's input is the MR2's output
SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0]));
SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
}
WiKiMapper31.java:package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VectorWritable;
public class WikiMapper31 extends Mapper<IntWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{
public void map(IntWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{
context.write(key, new VectorOrPrefWritable(value.get()));
//System.out.println("key"+key.toString()+",vlaue"+value.get());
}
}
WiKiReducer31.javapackage org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
public class WiKiReducer31 extends Reducer<IntWritable ,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable> {
public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{
for(VectorOrPrefWritable va:values){
context.write(key, va);
}
}
}
WiKiDriver32.java:package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;

public class WiKiDriver32 {
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf1 = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
if (otherArgs.length != 2) {
System.err.println("Usage: WiKiDriver32 <in> <out>");
System.exit(2);
}
Job job1 = new Job(conf1, "wiki  job one");
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setNumReduceTasks(1);
job1.setJarByClass(WiKiDriver32.class);
job1.setMapperClass(WikiMapper32.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(VectorOrPrefWritable.class);
job1.setReducerClass(WiKiReducer32.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(VectorOrPrefWritable.class);
// the WiKiDriver's out put is this one's input
SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0]));
SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
if(!job1.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
}
WikiMapper32.java:package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class WikiMapper32 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{
public void map(VarLongWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{
long userID=key.get();
Vector userVector=value.get();
Iterator<Vector.Element> it=userVector.iterateNonZero();
IntWritable itemi=new IntWritable();
while(it.hasNext()){
Vector.Element e=it.next();
int itemIndex=e.index();
float preferenceValue=(float)e.get();
itemi.set(itemIndex);
context.write(itemi, new VectorOrPrefWritable(userID,preferenceValue));
//System.out.println("item :"+itemi+",userand val:"+userID+","+preferenceValue);
}
//System.out.println();
}
}



WiKiReducer32.java 其实和WiKiReducer31.java一模一样的,此处不再给出;  下接 Hadoop 实现协同过滤 (example in <Mahout in action> chapter 6) Part 2

  

  

  

  分享,快乐,成长

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313089-1-1.html 上篇帖子: 如何用ruby来写hadoop的mapreduce并生成jar包 下篇帖子: 明义(robby)老师的深入浅出Hadoop实战开发视频课程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表