|
一、概要描述 shuffle是MapReduce的一个核心过程,因此没有在前面的MapReduce作业提交的过程中描述,而是单独拿出来比较详细的描述。 根据官方的流程图示如下:
本篇文章中只是想尝试从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取。 在执行每个map task时,无论map方法中执行什么逻辑,最终都是要把输出写到磁盘上。如果没有reduce阶段,则直接输出到hdfs上,如果有有reduce作业,则每个map方法的输出在写磁盘前线在内存中缓存。每个map task都有一个环状的内存缓冲区,存储着map的输出结果,默认100m,在每次当缓冲区快满的时候由一个独立的线程将缓冲区的数据以一个溢出文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有溢出文件做合并,被合并成已分区且已排序的输出文件。然后等待reduce task来拉数据。
二、 流程描述
- 在child进程调用到runNewMapper时,会设置output为NewOutputCollector,来负责map的输出。
- 在map方法的最后,不管经过什么逻辑的map处理,最终一般都要调用到TaskInputOutputContext的write方法,进而调用到设置的output即NewOutputCollector的write方法
- NewOutputCollector其实只是对MapOutputBuffer的一个封装,其write方法调用的是MapOutputBuffer的collect方法。
- MapOutputBuffer的collect方法中把key和value序列化后存储在一个环形缓存中,如果缓存满了则会调用startspill方法设置信号量,使得一个独立的线程SpillThread可以对缓存中的数据进行处理。
- SpillThread线程的run方法中调用sortAndSpill方法对缓存中的数据进行排序后写溢出文件。
- 当map输出完成后,会调用output的close方法。
- 在close方法中调用flush方法,对剩余的缓存进行处理,最后调用mergeParts方法,将前面过程的多个溢出文件合并为一个。
Mapreduce shuffle过程之Map输出过程代码流程
三、代码详细
1 MapTask的runNewMapper方法 注意到有这样一段代码。即当job中只有map没有reduce的时候,这个rg.apache.hadoop.mapreduce.RecordWriter类型的对象 output是一Outputformat中定义的writer,即直接写到输出中。如果是有Reduce,则output是一个NewOutputCollector类型输出。
1 if (job.getNumReduceTasks() == 0) {
2 output = outputFormat.getRecordWriter(taskContext);
3 } else {
4 output = new NewOutputCollector(taskContext, job, umbilical, reporter);
5 }
6 mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),input, output, committer, reporter, split);
7 input.initialize(split, mapperContext);
8 mapper.run(mapperContext);
和其他的RecordWriter一样,NewOutputCollector也继承自RecordWriter抽象类。除了一个close方法释放资源外,该抽象类定义的最主要的方法就一个void write(K key, V value)。即写入key,value。
2. Mapper的run方法,对每个输出执行map方法。
1 public void run(Context context) throws IOException, InterruptedException {
2 setup(context);
3 while (context.nextKeyValue()) {
4 map(context.getCurrentKey(), context.getCurrentValue(), context);
5 }
6 cleanup(context);
7 }
3. Mapper的map方法,默认是直接把key和value写入
1 protected void map(KEYIN key, VALUEIN value,
2 Context context) throws IOException, InterruptedException {
3 context.write((KEYOUT) key, (VALUEOUT) value);
4 }
一般使用中会做很多我们需要的操作,如著名的wordcount中,把一行单词切分后,数一(value都设为one = new IntWritable(1)),但最终都是要把结果写入。即调用context.write(key,value)
1 public void map(Object key, Text value, Context context
2 ) throws IOException, InterruptedException {
3 StringTokenizer itr = new StringTokenizer(value.toString());
4 while (itr.hasMoreTokens()) {
5 word.set(itr.nextToken());
6 context.write(word, one);
7 }
8 }
4.TaskInputOutputContext的write方法。调用的是contex中的RecordWriter的write方法。即调用的是NewOutputCollector的write方法。
1 public void write(KEYOUT key, VALUEOUT value
2 ) throws IOException, InterruptedException {
3 output.write(key, value);
4 }
5.NewOutputCollector的write方法。
1 public void write(K key, V value) throws IOException, InterruptedException {
2 collector.collect(key, value,
3 partitioner.getPartition(key, value, partitions));
4 }
从方法名上不难看出提供写数据的是MapOutputCollector类型的 collector对象从NewOutputCollector的构造函数中看到collector的初始化。
collector = new MapOutputBuffer(umbilical, job, reporter);
6.MapOutputBuffer的构造函数,在了解MapOutputBuffer的collect方法前,先了解下期构造函数,看做了哪些初始化。
1 public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
2 TaskReporter reporter
3 ) throws IOException, ClassNotFoundException {
4 this.job = job;
5 this.reporter = reporter;
6 localFs = FileSystem.getLocal(job);
7 //1)设定map的分区数,即作业 配置中的的reduce数
8 partitions = job.getNumReduceTasks();
9
10 rfs = ((LocalFileSystem)localFs).getRaw();
11
12 indexCacheList = new ArrayList();
13
14 //2)重要的参数
15 final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
16 final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
17 final int sortmb = job.getInt("io.sort.mb", 100);
18 if (spillper > (float)1.0 || spillper < (float)0.0) {
19 throw new IOException("Invalid \"io.sort.spill.percent\": " + spillper);
20 }
21 if (recper > (float)1.0 || recper < (float)0.01) {
22 throw new IOException("Invalid \"io.sort.record.percent\": " + recper);
23 }
24 if ((sortmb & 0x7FF) != sortmb) {
25 throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
26 }
27 //3)sorter,使用其对map的输出在partition内进行内排序。
28 sorter = ReflectionUtils.newInstance(
29 job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
30 LOG.info("io.sort.mb = " + sortmb);
31 // buffers and accounting
32 //把单位是M的sortmb设定左移20,还原单位为个
33 int maxMemUsage = sortmb |
|