设为首页 收藏本站
查看: 1202|回复: 0

[经验分享] Hadoop实例:单轮MapReduce的矩阵乘法

[复制链接]

尚未签到

发表于 2016-12-10 07:10:14 | 显示全部楼层 |阅读模式
  最近开始在看@王斌_ICTIR老师的《大数据:互联网大规模数据挖掘与分布式处理》,下面是对第二章提到的的单轮计算矩阵乘法进行的学习实现过程。
   
       矩阵的乘法只有在第一个矩阵的列数(column)和第二个矩阵的行数(row)相同时才有定义。一般单指矩阵乘积时,指的便是一般矩阵乘积。若A为i×r矩阵,B为r×j矩阵,则他们的乘积AB(有时记做A · B)会是一个i×j矩阵。其乘积矩阵的元素如下面式子得出:
   DSC0000.jpg
      
       书中提到的对矩阵乘法的MapReduce实现方法是:
      Map函数:对于矩阵M的每个元素M[i,j],产生一系列的键值对(i,k)->(M,j, M[i,j]),其中k=1,2…,直到矩阵N的列数。同样,对于矩阵N的每个元素N[j,k],产生一系列的键值对(i,k)->(N,j,N[j,k]),其中i=1,2…,直到矩阵M的行数。
      Reduce函数:根据MR的原理,相同键i,k的数据会发送个同一个 reduce。如果M为2*2矩阵,N为2×3矩阵,reduce函数需要处理的数据为:
  1,1->[(M,1, M[1,1])(M,2, M[1,2])、(N,1, N[1,1])、(N,2, N[2,1])]
  1,2->[(M,1, M[1,1])(M,2, M[1,2])、(N,1, N[1,2])、(N,2, N[2,2])]
  1,3->[(M,1, M[1,1])(M,2, M[1,2])、(N,1, N[1,3])、(N,2, N[2,3])],
  2,1->[(M,1, M[2,1])、(M,2, M[2,2])、(N,1, N[1,1])、(N,2, N[2,1])]
  2,2->[(M,1, M[2,1])、(M,2, M[2,2])、(N,1, N[1,2])、(N,2, N[2,2])]
  2,3->[(M,1, M[2,1])、(M,2, M[2,2])、(N,1, N[1,3])、(N,2, N[2,3])]
   
       这样只要将所有(M,j, M[i,j])和(N,j, N[j,k])分别按照j值排序并放在不同的两个列表里面。将这个列表的第j个元素M[i,j]个N[j,k]相乘,然后将这些积相加,最后积的和与键(i,k)组对作为reduce函数的输出。对于上面的例子reduce的输出就是:
  1,1->M[1,1]* N[1,1]+ M[1,2]* N[2,1]
  1,2->M[1,1]* N[1,2]+ M[1,2]* N[2,2]
  1,3->M[1,1]* N[1,3]+ M[1,2]* N[2,3]
  2,1->M[2,1]* N[2,1]+ M[2,2]* N[2,1]
  2,2->M[2,1]* N[1,2]+ M[2,2]* N[2,2]
  2,3->M[2,1]* N[1,3]+ M[2,2]* N[2,3]
   
       下面是MapReduce的实现步骤:
      (1).构造矩阵M300*150;矩阵N150*500。两矩阵的值放在一个M.data文件中,每行的格式为:文件标识#行坐标#列坐标#坐标值。
   DSC0001.jpg
     
      (2).基于上面的方法编写Map函数和Reduce函数。代码详见:
            https://github.com/intergret/snippet/blob/master/MartrixMultiplication.java
   DSC0002.jpg
      
      (3).将运行的结果文件copy到本地,并使用check.py对结果中元素[10,95]的正确性进行验证。
   DSC0003.jpg
1. [代码]MapReduce    
001import java.io.IOException;

002import org.apache.hadoop.conf.Configuration;

003import org.apache.hadoop.fs.Path;

004import org.apache.hadoop.io.Text;

005import org.apache.hadoop.mapreduce.Job;

006import org.apache.hadoop.mapreduce.Mapper;

007import org.apache.hadoop.mapreduce.Reducer;

008import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

009import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

010import org.apache.hadoop.util.GenericOptionsParser;

011 

012 

013public class MartrixMultiplication{

014 

015  public static class MartrixMapper extends Mapper<Object, Text, Text, Text>{

016     

017    private Text map_key = new Text();

018    private Text map_value = new Text();

019     

020    int rNumber = 300;

021    int cNumber = 500;

022    String fileTarget;

023    String i, j, k, ij, jk;

024     

025       

026    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

027       

028        String eachterm[] = value.toString().split("#");

029         

030        fileTarget = eachterm[0];

031         

032        if(fileTarget.equals("M")){

033            i = eachterm[1];

034            j = eachterm[2];

035            ij = eachterm[3];

036             

037            for(int c = 1; c<=cNumber; c++){

038                map_key.set(i + "#" + String.valueOf(c));

039                map_value.set("M" + "#" + j + "#" + ij);

040                context.write(map_key, map_value);

041            }

042             

043        }else if(fileTarget.equals("N")){

044            j = eachterm[1];

045            k = eachterm[2];

046            jk = eachterm[3];

047             

048            for(int r = 1; r<=rNumber; r++){

049                map_key.set(String.valueOf(r) + "#" +k);

050            map_value.set("N" + "#" + j + "#" + jk);

051            context.write(map_key, map_value);

052          }

053             

054        }

055    }

056  }

057   

058   

059  public static class MartrixReducer extends Reducer<Text,Text,Text,Text> {

060     

061    private Text reduce_value = new Text();

062     

063    int jNumber = 150;

064     

065      int M_ij[] = new int[jNumber+1];

066    int N_jk[] = new int[jNumber+1];

067     

068    int j, ij, jk;

069     

070    String fileTarget;

071    int jsum = 0;

072     

073    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

074       

075      jsum = 0;

076       

077      for (Text val : values) {

078          String eachterm[] = val.toString().split("#");

079           

080          fileTarget = eachterm[0];

081          j = Integer.parseInt(eachterm[1]);

082           

083          if(fileTarget.equals("M")){

084              ij = Integer.parseInt(eachterm[2]);

085              M_ij[j] = ij;

086          }else if(fileTarget.equals("N")){

087              jk = Integer.parseInt(eachterm[2]);

088              N_jk[j] = jk;

089        }

090           

091      }

092       

093       

094      for(int d = 1; d<=jNumber; d++){

095          jsum +=  M_ij[d] * N_jk[d];

096      }

097       

098      reduce_value.set(String.valueOf(jsum));

099        context.write(key, reduce_value);

100       

101    }

102  }

103   

104 

105  public static void main(String[] args) throws Exception {

106       

107        Configuration conf = new Configuration();

108        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

109        if (otherArgs.length != 2) {

110          System.err.println("Usage: MartrixMultiplication <in> <out>");

111          System.exit(2);

112        }

113         

114        Job job = new Job(conf, "martrixmultiplication");

115        job.setJarByClass(MartrixMultiplication.class);

116        job.setMapperClass(MartrixMapper.class);

117        job.setReducerClass(MartrixReducer.class);

118         

119        job.setOutputKeyClass(Text.class);

120        job.setOutputValueClass(Text.class);

121         

122        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

123        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

124         

125        System.exit(job.waitForCompletion(true) ? 0 : 1);

126                     

127  }

128   

129}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312016-1-1.html 上篇帖子: windows环境下使用MyEclipse调用hadoop集群 下篇帖子: hadoop 0.23 配置(启动、跑mapRedcue、web UI)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表