设为首页 收藏本站
查看: 959|回复: 0

[经验分享] Hadoop MapReduce进阶 使用DataJoin包实现Join

[复制链接]

尚未签到

发表于 2016-12-10 10:22:38 | 显示全部楼层 |阅读模式
概念:
  Hadoop有一个叫DataJoin的包为Data Join提供相应的框架。它的Jar包存在于contrib/datajoin/hadoop-*-datajoin。
  为区别于其他的data join技术,我们称其为reduce-side join。(因为我们在reducer上作大多数的工作)
  reduce-side join引入了一些术语及概念:
  1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式)
  Customers                 Orders
      1,Stephanie Leung,555-555-5555      3,A,12.95,02-Jun-2008
      2,Edward Kim,123-456-7890        1,B,88.25,20-May-2008
      3,Jose Madriz,281-330-8004        2,C,32.00,30-Nov-2007
      4,David Stork,408-555-0000         3,D,25.02,22-Jan-2009
  2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。
  3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group key,所以其较之关系数据库中的join key更一般、平常。
  

流程:(详见《Hadoop in Action》Chapter 5.2)
Advanced MapReduce:

DSC0000.gif



Joining Data from different sources:

DSC0001.gif



  

利用datajoin包来实现join:
  Hadoop的datajoin包中有三个需要我们继承的类:DataJoinMapperBase,DataJoinReducerBase,TaggedMapOutput。正如其名字一样,我们的MapClass将会扩展DataJoinMapperBase,Reduce类会扩展DataJoinReducerBase。这个datajoin包已经实现了map()和reduce()方法,因此我们的子类只需要实现一些新方法来设置一些细节。
  在用DataJoinMapperBase和DataJoinReducerBase之前,我们需要弄清楚我们贯穿整个程序使用的新的虚数据类TaggedMapOutput。
  根据之前我们在图Advance MapReduce的数据流中所展示的那样,mapper输出一个包(由一个key和一个value(tagged record)组成)。datajoin包将key设置为Text类型,将value设置为TaggedMapOutput类型(TaggedMapOutput是一个将我们的记录使用一个Text类型的tag包装起来的数据类型)。它实现了getTag()和setTag(Text tag)方法。它还定义了一个getData()方法,我们的子类将实现这个方法来处理record记录。我们并没有明确地要求子类实现setData()方法,但我们最好还是实现这个方法以实现程序的对称性(或者在构造函数中实现)。作为Mapper的输出,TaggedMapOutput需要是Writable类型,因此的子类还需要实现readFields()和write()方法。
  

DataJoinMapperBase:
  回忆join数据流图,mapper的主要功能就是打包一个record使其能够和其他拥有相同group key的记录去向一个Reducer。DataJoinMapperBase完成所有的打包工作,这个类定义了三个虚类让我们的子类实现:
  protected abstract Text generateInputTag(String inputFile);
  protected abstract TaggedMapOutput generateTaggedMapOutut(Object value);
  protected abstract Text generateGroupKey(TaggedMapOutput aRecored);
  在一个map任务开始之前为所有这个map任务会处理的记录定义一个tag(Text),结果将保存到DataJoinMapperBase的inputTag变量中,我们也可以保存filename至inputFile变量中以待后用。
  

  在map任务初始化之后,DataJoinMapperBase的map()方法会对每一个记录执行。它调用了两个我们还没有实现的虚方法:generateTaggedMapOutput()以及generateGroupKey(aRecord);(详见代码)
  

DataJoinReducerBase:
  DataJoinMapperBase将我们所需要做的工作以一个full outer join的方式简化。我们的Reducer子类只需要实现combine()方法来滤除掉我们不需要的组合来得到我们需要的(inner join, left outer join等)。同时我们也在combiner()中将我们的组合格式化为输出格式。
  

代码:
[java] view plaincopyprint?


  • importjava.io.DataInput;
  • importjava.io.DataOutput;
  • importjava.io.IOException;
  • importjava.util.Iterator;

  • importorg.apache.hadoop.conf.Configuration;
  • importorg.apache.hadoop.conf.Configured;
  • importorg.apache.hadoop.fs.Path;
  • importorg.apache.hadoop.io.Text;
  • importorg.apache.hadoop.io.Writable;
  • importorg.apache.hadoop.mapred.FileInputFormat;
  • importorg.apache.hadoop.mapred.FileOutputFormat;
  • importorg.apache.hadoop.mapred.JobClient;
  • importorg.apache.hadoop.mapred.JobConf;
  • importorg.apache.hadoop.mapred.KeyValueTextInputFormat;
  • importorg.apache.hadoop.mapred.MapReduceBase;
  • importorg.apache.hadoop.mapred.Mapper;
  • importorg.apache.hadoop.mapred.OutputCollector;
  • importorg.apache.hadoop.mapred.Reducer;
  • importorg.apache.hadoop.mapred.Reporter;
  • importorg.apache.hadoop.mapred.TextInputFormat;
  • importorg.apache.hadoop.mapred.TextOutputFormat;
  • importorg.apache.hadoop.util.Tool;
  • importorg.apache.hadoop.util.ToolRunner;

  • importorg.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
  • importorg.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
  • importorg.apache.hadoop.contrib.utils.join.TaggedMapOutput;

  • publicclassDataJoinextendsConfiguredimplementsTool{

  • publicstaticclassMapClassextendsDataJoinMapperBase{

  • protectedTextgenerateInputTag(StringinputFile){
  • Stringdatasource=inputFile.split("-")[0];
  • returnnewText(datasource);
  • }

  • protectedTextgenerateGroupKey(TaggedMapOutputaRecord){
  • Stringline=((Text)aRecord.getData()).toString();
  • String[]tokens=line.split(",");
  • StringgroupKey=tokens[0];
  • returnnewText(groupKey);
  • }

  • protectedTaggedMapOutputgenerateTaggedMapOutput(Objectvalue){
  • TaggedWritableretv=newTaggedWritable((Text)value);
  • retv.setTag(this.inputTag);
  • returnretv;
  • }
  • }

  • publicstaticclassReduceextendsDataJoinReducerBase{

  • protectedTaggedMapOutputcombine(Object[]tags,Object[]values){
  • if(tags.length<2)returnnull;
  • StringjoinedStr="";
  • for(inti=0;i<values.length;i++){
  • if(i>0)joinedStr+=",";
  • TaggedWritabletw=(TaggedWritable)values;
  • Stringline=((Text)tw.getData()).toString();
  • String[]tokens=line.split(",",2);
  • joinedStr+=tokens[1];
  • }
  • TaggedWritableretv=newTaggedWritable(newText(joinedStr));
  • retv.setTag((Text)tags[0]);
  • returnretv;
  • }
  • }

  • publicstaticclassTaggedWritableextendsTaggedMapOutput{

  • privateWritabledata;

  • publicTaggedWritable(Writabledata){
  • this.tag=newText("");
  • this.data=data;
  • }

  • publicWritablegetData(){
  • returndata;
  • }

  • publicvoidwrite(DataOutputout)throwsIOException{
  • this.tag.write(out);
  • this.data.write(out);
  • }

  • publicvoidreadFields(DataInputin)throwsIOException{
  • this.tag.readFields(in);
  • this.data.readFields(in);
  • }
  • }

  • publicintrun(String[]args)throwsException{
  • Configurationconf=getConf();

  • JobConfjob=newJobConf(conf,DataJoin.class);

  • Pathin=newPath(args[0]);
  • Pathout=newPath(args[1]);
  • FileInputFormat.setInputPaths(job,in);
  • FileOutputFormat.setOutputPath(job,out);

  • job.setJobName("DataJoin");
  • job.setMapperClass(MapClass.class);
  • job.setReducerClass(Reduce.class);

  • job.setInputFormat(TextInputFormat.class);
  • job.setOutputFormat(TextOutputFormat.class);
  • job.setOutputKeyClass(Text.class);
  • job.setOutputValueClass(TaggedWritable.class);
  • job.set("mapred.textoutputformat.separator",",");

  • JobClient.runJob(job);
  • return0;
  • }

  • publicstaticvoidmain(String[]args)throwsException{
  • intres=ToolRunner.run(newConfiguration(),
  • newDataJoin(),
  • args);

  • System.exit(res);
  • }
  • }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312264-1-1.html 上篇帖子: (转载)基于Eclipse的Hadoop应用开发环境的配置 下篇帖子: Hadoop中的集群配置和使用技巧── 分布式计算开源框架Hadoop入门实践(二)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表