设为首页 收藏本站
查看: 1225|回复: 0

[经验分享] 【hadoop代码笔记】Mapreduce shuffle过程之Map输出过程

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-11 08:05:31 | 显示全部楼层 |阅读模式
  一、概要描述 shuffle是MapReduce的一个核心过程,因此没有在前面的MapReduce作业提交的过程中描述,而是单独拿出来比较详细的描述。 根据官方的流程图示如下:
DSC0000.png
  本篇文章中只是想尝试从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取。 在执行每个map task时,无论map方法中执行什么逻辑,最终都是要把输出写到磁盘上。如果没有reduce阶段,则直接输出到hdfs上,如果有有reduce作业,则每个map方法的输出在写磁盘前线在内存中缓存。每个map task都有一个环状的内存缓冲区,存储着map的输出结果,默认100m,在每次当缓冲区快满的时候由一个独立的线程将缓冲区的数据以一个溢出文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有溢出文件做合并,被合并成已分区且已排序的输出文件。然后等待reduce task来拉数据。
  二、 流程描述


  • 在child进程调用到runNewMapper时,会设置output为NewOutputCollector,来负责map的输出。
  • 在map方法的最后,不管经过什么逻辑的map处理,最终一般都要调用到TaskInputOutputContext的write方法,进而调用到设置的output即NewOutputCollector的write方法
  • NewOutputCollector其实只是对MapOutputBuffer的一个封装,其write方法调用的是MapOutputBuffer的collect方法。
  • MapOutputBuffer的collect方法中把key和value序列化后存储在一个环形缓存中,如果缓存满了则会调用startspill方法设置信号量,使得一个独立的线程SpillThread可以对缓存中的数据进行处理。
  • SpillThread线程的run方法中调用sortAndSpill方法对缓存中的数据进行排序后写溢出文件。
  • 当map输出完成后,会调用output的close方法。
  • 在close方法中调用flush方法,对剩余的缓存进行处理,最后调用mergeParts方法,将前面过程的多个溢出文件合并为一个。
DSC0001.jpg
  Mapreduce shuffle过程之Map输出过程代码流程
  三、代码详细
  1 MapTask的runNewMapper方法 注意到有这样一段代码。即当job中只有map没有reduce的时候,这个rg.apache.hadoop.mapreduce.RecordWriter类型的对象 output是一Outputformat中定义的writer,即直接写到输出中。如果是有Reduce,则output是一个NewOutputCollector类型输出。
  



1 if (job.getNumReduceTasks() == 0) {
2         output = outputFormat.getRecordWriter(taskContext);
3       } else {
4         output = new NewOutputCollector(taskContext, job, umbilical, reporter);
5       }
6       mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),input, output, committer,                                                 reporter, split);
7       input.initialize(split, mapperContext);
8       mapper.run(mapperContext);
  
  和其他的RecordWriter一样,NewOutputCollector也继承自RecordWriter抽象类。除了一个close方法释放资源外,该抽象类定义的最主要的方法就一个void write(K key, V value)。即写入key,value。
2. Mapper的run方法,对每个输出执行map方法。




1 public void run(Context context) throws IOException, InterruptedException {
2     setup(context);
3     while (context.nextKeyValue()) {
4       map(context.getCurrentKey(), context.getCurrentValue(), context);
5     }
6     cleanup(context);
7 }
  
  3. Mapper的map方法,默认是直接把key和value写入



1  protected void map(KEYIN key, VALUEIN value,
2                      Context context) throws IOException, InterruptedException {
3     context.write((KEYOUT) key, (VALUEOUT) value);
4   }
  一般使用中会做很多我们需要的操作,如著名的wordcount中,把一行单词切分后,数一(value都设为one = new IntWritable(1)),但最终都是要把结果写入。即调用context.write(key,value)



1 public void map(Object key, Text value, Context context
2                     ) throws IOException, InterruptedException {
3       StringTokenizer itr = new StringTokenizer(value.toString());
4       while (itr.hasMoreTokens()) {
5         word.set(itr.nextToken());
6         context.write(word, one);
7       }
8     }
  4.TaskInputOutputContext的write方法。调用的是contex中的RecordWriter的write方法。即调用的是NewOutputCollector的write方法。



1 public void write(KEYOUT key, VALUEOUT value
2                     ) throws IOException, InterruptedException {
3     output.write(key, value);
4 }
  5.NewOutputCollector的write方法。



1 public void write(K key, V value) throws IOException, InterruptedException {
2       collector.collect(key, value,
3                         partitioner.getPartition(key, value, partitions));
4 }
  从方法名上不难看出提供写数据的是MapOutputCollector类型的 collector对象从NewOutputCollector的构造函数中看到collector的初始化。



collector = new MapOutputBuffer(umbilical, job, reporter);
  6.MapOutputBuffer的构造函数,在了解MapOutputBuffer的collect方法前,先了解下期构造函数,看做了哪些初始化。



1 public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
2                            TaskReporter reporter
3                            ) throws IOException, ClassNotFoundException {
4       this.job = job;
5       this.reporter = reporter;
6       localFs = FileSystem.getLocal(job);
7      //1)设定map的分区数,即作业 配置中的的reduce数
8       partitions = job.getNumReduceTasks();
9
10       rfs = ((LocalFileSystem)localFs).getRaw();
11
12       indexCacheList = new ArrayList();
13
14       //2)重要的参数
15       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
16       final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
17       final int sortmb = job.getInt("io.sort.mb", 100);
18       if (spillper > (float)1.0 || spillper < (float)0.0) {
19         throw new IOException("Invalid \"io.sort.spill.percent\": " + spillper);
20       }
21       if (recper > (float)1.0 || recper < (float)0.01) {
22         throw new IOException("Invalid \"io.sort.record.percent\": " + recper);
23       }
24       if ((sortmb & 0x7FF) != sortmb) {
25         throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
26       }
27       //3)sorter,使用其对map的输出在partition内进行内排序。
28       sorter = ReflectionUtils.newInstance(
29             job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
30       LOG.info("io.sort.mb = " + sortmb);
31       // buffers and accounting
32      //把单位是M的sortmb设定左移20,还原单位为个
33       int maxMemUsage = sortmb

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85327-1-1.html 上篇帖子: Hadoop:The Definitive Guid 总结 Chapter 7 MapReduce的类型与格式 下篇帖子: [大牛翻译系列]Hadoop(1)MapReduce 连接:重分区连接(Repartition join)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表