设为首页 收藏本站
查看: 1596|回复: 0

[经验分享] Hadoop的ChainMapper和ChainReducer使用案例(链式处理)(四)

[复制链接]

尚未签到

发表于 2017-12-18 12:16:40 | 显示全部楼层 |阅读模式
  不多说,直接上干货!
  Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项目源自Lucene,自然也借鉴了一些Lucene中的处理方式。
  举个例子,比如处理文本中的一些禁用词,或者敏感词,等等,Hadoop里的链式操作,支持的形式类似正则Map+ Rrduce Map*,代表的意思是全局只能有一个唯一的Reduce,但是在Reduce的前后是可以存在无限多个Mapper来进行一些预处理或者善后工作的。
  注意:
  1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。
  2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。
  比如:
  Map1 -> Map2 -> Reducer -> Map3 -> Map4
  (不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)
DSC0000.png

  在编程的时候,我们可以借用源码提供给我们的程序!在此基础上进行修改和编写。
  比如我的源码本地目录如下:(找我的本地ChainMapper和ChainReducer案例
  D:\SoftWare\hadoop-2.2.0-src\hadoop-mapreduce-project\hadoop-mapreduce-client\hadoop-mapreduce-client-core\src\main\java\org\apache\hadoop\mapreduce\lib\chain
DSC0001.png

  任务介绍:
  这个任务需要两步完成:
  1. 对一篇文章进行WordCount
  2. 统计出现次数超过5词的单词
  WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:
  Java代码

  • package hadoop_in_action_exersice;

  • import java.io.IOException;
  • import java.util.Iterator;
  • import java.util.StringTokenizer;

  • import org.apache.hadoop.fs.FileSystem;
  • import org.apache.hadoop.fs.Path;
  • import org.apache.hadoop.io.IntWritable;
  • import org.apache.hadoop.io.LongWritable;
  • import org.apache.hadoop.io.Text;
  • import org.apache.hadoop.mapred.FileInputFormat;
  • import org.apache.hadoop.mapred.FileOutputFormat;
  • import org.apache.hadoop.mapred.JobClient;
  • import org.apache.hadoop.mapred.JobConf;
  • import org.apache.hadoop.mapred.MapReduceBase;
  • import org.apache.hadoop.mapred.Mapper;
  • import org.apache.hadoop.mapred.OutputCollector;
  • import org.apache.hadoop.mapred.Reducer;
  • import org.apache.hadoop.mapred.Reporter;
  • import org.apache.hadoop.mapred.TextInputFormat;
  • import org.apache.hadoop.mapred.TextOutputFormat;

  • public class ChainedJobs {

  •     public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

  •         private final static IntWritable one = new IntWritable(1);
  •         public static final int LOW_LIMIT = 5;
  •         @Override
  •         public void map(LongWritable key, Text value,
  •                 OutputCollector<Text, IntWritable> output, Reporter reporter)
  •                 throws IOException {
  •             String line = value.toString();
  •             StringTokenizer st = new StringTokenizer(line);
  •             while(st.hasMoreTokens())
  •                 output.collect(new Text(st.nextToken()), one);

  •         }

  •     }

  •     public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

  •         @Override
  •         public void reduce(Text key, Iterator<IntWritable> values,
  •                 OutputCollector<Text, IntWritable> output, Reporter reporter)
  •                 throws IOException {
  •             int sum = 0;
  •             while(values.hasNext()) {
  •                 sum += values.next().get();
  •             }
  •             output.collect(key, new IntWritable(sum));
  •         }

  •     }


  •     public static void main(String[] args) throws IOException {


  •         JobConf conf = new JobConf(ChainedJobs.class);
  •         conf.setJobName("wordcount");           //设置一个用户定义的job名称
  •         conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
  •         conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
  •         conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
  •         conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
  •         conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类
  •         conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
  •         conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类

  •         // Remove output folder before run job(s)
  •         FileSystem fs=FileSystem.get(conf);
  •         String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
  •         Path op=new Path(outputPath);
  •         if (fs.exists(op)) {
  •             fs.delete(op, true);
  •             System.out.println("存在此输出路径,已删除!!!");
  •         }

  •         FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
  •         FileOutputFormat.setOutputPath(conf, new Path(outputPath));
  •         JobClient.runJob(conf);         //运行一个job
  •     }

  • }
  上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。
  为了方便理解,上面的输入的例子如下:
  Java代码

  • accessed    3
  • accessible  4
  • accomplish  1
  • accounting  7
  • accurately  1
  • acquire 1
  • across  1
  • actual  1
  • actually    1
  • add 3
  • added   2
  • addition    1
  • additional  4
  old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X
  新的API会方便简洁很多
  下面是增加了一个Mapper 来过滤
  Java代码

  • public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {

  •     @Override
  •     public void map(Text key, IntWritable value,
  •             OutputCollector<Text, IntWritable> output, Reporter reporter)
  •             throws IOException {

  •         if(value.get() >= LOW_LIMIT) {
  •             output.collect(key, value);
  •         }

  •     }
  • }
  这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出
  所以,目前为止,任务链如下:
  TokenizerMapper -> TokenizeReducer -> RangeFilterMapper
  所以我们的main函数改成下面的样子:
  Java代码

  • public static void main(String[] args) throws IOException {


  •     JobConf conf = new JobConf(ChainedJobs.class);
  •     conf.setJobName("wordcount");           //设置一个用户定义的job名称
  • //        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
  • //        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
  • //        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
  • //        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
  • //        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类
  • //        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
  • //        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类

  •     // Step1 : mapper forr word count
  •     JobConf wordCountMapper  = new JobConf(false);
  •     ChainMapper.addMapper(conf,
  •             TokenizeMapper.class,
  •             LongWritable.class,     // input key type
  •             Text.class,             // input value type
  •             Text.class,             // output key type
  •             IntWritable.class,      // output value type
  •             false,                  //byValue or byRefference 传值还是传引用
  •             wordCountMapper);

  •     // Step2: reducer for word count
  •     JobConf wordCountReducer  = new JobConf(false);
  •     ChainReducer.setReducer(conf,
  •             TokenizeReducer.class,
  •             Text.class,
  •             IntWritable.class,
  •             Text.class,
  •             IntWritable.class,
  •             false,
  •             wordCountReducer);

  •         // Step3: mapper used as filter
  •     JobConf rangeFilterMapper  = new JobConf(false);
  •     ChainReducer.addMapper(conf,
  •             RangeFilterMapper.class,
  •             Text.class,
  •             IntWritable.class,
  •             Text.class,
  •             IntWritable.class,
  •             false,
  •             rangeFilterMapper);


  •     // Remove output folder before run job(s)
  •     FileSystem fs=FileSystem.get(conf);
  •     String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
  •     Path op=new Path(outputPath);
  •     if (fs.exists(op)) {
  •         fs.delete(op, true);
  •         System.out.println("存在此输出路径,已删除!!!");
  •     }

  •     FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
  •     FileOutputFormat.setOutputPath(conf, new Path(outputPath));
  •     JobClient.runJob(conf);         //运行一个job
  • }
  下面是运行结果的一部分:
  Java代码

  • a   40
  • and 26
  • are 12
  • as  6
  • be  7
  • been    8
  • but 5
  • by  5
  • can 12
  • change  5
  • data    5
  • files   7
  • for 28
  • from    5
  • has 7
  • have    8
  • if  6
  • in  27
  • is  16
  • it  13
  • more    8
  • not 5
  • of  23
  • on  5
  • outputs 5
  • see 6
  • so  11
  • that    11
  • the 54
  可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425358-1-1.html 上篇帖子: Hadoop MapReduce编程 API入门系列之统计学生成绩版本2(十八) 下篇帖子: Hadoop MapReduce编程 API入门系列之join(二十六)(未完)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表