|
一、回顾单词统计源码
1 package counter;
2
3 import java.net.URI;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.FileSystem;
7 import org.apache.hadoop.fs.Path;
8 import org.apache.hadoop.io.LongWritable;
9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19
20 public class WordCountApp {
21 static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
22 static final String OUT_PATH = "hdfs://hadoop:9000/out";
23
24 public static void main(String[] args) throws Exception {
25
26 Configuration conf = new Configuration();
27
28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29 final Path outPath = new Path(OUT_PATH);
30
31 if(fileSystem.exists(outPath)){
32 fileSystem.delete(outPath, true);
33 }
34
35 final Job job = new Job(conf , WordCountApp.class.getSimpleName());
36
37 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
38
39 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
40
41 job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类
42 job.setMapOutputKeyClass(Text.class);//map输出的类型。如果的类型与类型一致,则可以省略
43 job.setMapOutputValueClass(LongWritable.class);
44
45 job.setPartitionerClass(HashPartitioner.class);//1.3 分区
46 job.setNumReduceTasks(1);//有一个reduce任务运行
47
48 job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类
49 job.setOutputKeyClass(Text.class);//指定reduce的输出类型
50 job.setOutputValueClass(LongWritable.class);
51
52 FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里
53
54 job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
55
56 job.waitForCompletion(true);//把job提交给JobTracker运行
57 }
58
59 /**
60 * KEYIN 即k1 表示行的偏移量
61 * VALUEIN 即v1 表示行文本内容
62 * KEYOUT 即k2 表示行中出现的单词
63 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1
64 */
65 static class MyMapper extends Mapper{
66 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
67 // final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
68
69 final String line = v1.toString();
70 /* if(line.contains("hello")){
71 //记录敏感词出现在一行中
72 helloCounter.increment(1L);
73 }*/
74 final String[] splited = line.split(" ");
75 for (String word : splited) {
76 context.write(new Text(word), new LongWritable(1));
77 }
78 };
79 }
80
81 /**
82 * KEYIN 即k2 表示行中出现的单词
83 * VALUEIN 即v2 表示行中出现的单词的次数
84 * KEYOUT 即k3 表示文本中出现的不同单词
85 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数
86 *
87 */
88 static class MyReducer extends Reducer{
89 protected void reduce(Text k2, java.lang.Iterable v2s, Context ctx) throws java.io.IOException ,InterruptedException {
90 long times = 0L;
91 for (LongWritable count : v2s) {
92 times += count.get();
93 }
94 ctx.write(k2, new LongWritable(times));
95 };
96 }
97
98 }
View Code 代码1.1
二、原理与代码解析
2.1 MapReduce的任务与原理
2.1.1 MapReduce的工作原理
MapReduce的工作原理如下图2.1所示。
图 2.1
在图中我们已看出,关于File有两种划分,一个是split分片一个是block,注意分片只是逻辑划分,并不是像划分block那样,将文件真是的划分为多个部分,他只是逻辑上的的划分,可以说是只是读取时候按分片来读取。关于分片的大小默认为块大小,为什么要这样呢?那因为MapReduce作业 处理的文件是存放在DataNode上的,而且文件在DataNode上是按block存放的,而不同的block可是存放在不同的DataNode上的,如果分片大小大于block块大小,那么说明一个块满足不 了该分片,那么就需要再读取一个block块,这样当这两个block块位于不同的DataNode上 时,就要通过网络访问另一个节点,这样就可能造成网络延迟影响Mapreduce的执行效率,所以一般分片大小会默认为block块大小。
在分析一下该图,不难看出,每一个split都分配了一个MappperTask,每个MapperTask又有三个箭头,有三个不同的走向表示分了三个区,那就有三个ReducerTask,而最终的结果会分不同的痛的部分存放在DataNode目录中。我们也可以对比下面这张图来对比理解MapReduce的工作原理,如图2.2所示。
图 2.2
2.1.2 map()和reduce的任务
map任务处理
1) 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
2) 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
3) 对输出的key、value进行分区。
4) 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
reduce任务处理
1) 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2) 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
3) 把reduce的输出保存到文件中。
2.2源码任务的对比分析
关于任务和源码的对应分析主要是针对map的第一项和第二项任务。第一项任务是:读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。第二项任务是:写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
2.2.1 第一项任务
从上面代码1.1中,可以看出这项任务是由下面这段代码来完成,如代码2.1所示。
1 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
2 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
代码 2.1
分析这段代码,可以知道,由代码中的TextInputFromat这个类主要是来完成分割的任务的,下面先来看一下该类的树结构,如下图2.1所示。
图 2.2
从图中可知,TextInputFormat的继承的关系为,TextInputFormat--->FileInputformat--->InputFormat,那么看进入TextInputFormat类,看一下该类的注释,和其中的方法,如下代码2.2,2.3,注释中的@link表示后面跟的是一个连接,可以点击查看。
1 * InputFormat describes the input-specification for a InputFormat用来描述Map-Reduce的输入规格
2 * Map-Reduce job.
3 *
4 * The Map-Reduce framework relies on the InputFormat of the Map-reduce框架依赖于一个job的InputFormat
5 * job to:
6 *
7 *
8 * Validate the input-specification of the job. 验证job的输入规格
9 *
10 * Split-up the input file(s) into logical {@link InputSplit}s, each of 把输入文件拆分成逻辑Inputsplit,每一个
11 * which is then assigned to an individual {@link Mapper}. InputSplit都会被分配到一个独立的Mapper
12 *
13 *
14 * Provide the {@link RecordReader} implementation to be used to glean 提供实现类RcordReader,用于为Mapper任务,从逻辑InputSplit
15 * input records from the logical InputSplit for processing by 收集输入记录。
16 * the {@link Mapper}.
17 *
18 *
View Code 代码 2.2
1 /**
2 * Logically split the set of input files for the job. 为job逻辑切分输入文件
3 * @param context job configuration.
4 * @return an array of {@link InputSplit}s for the job.
5 */
6 public abstract
7 List getSplits(JobContext context
8 ) throws IOException, InterruptedException;
9
10 /**
11 * Create a record reader for a given split. The framework will call 为分片创建一个记录读取器
12 * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
13 * the split is used.
14 * @param split the split to be read
15 * @param context the information about the task
16 * @return a new record reader
17 * @throws IOException
18 * @throws InterruptedException
19 */
20 public abstract
21 RecordReader createRecordReader(InputSplit split,
22 TaskAttemptContext context
23 ) throws IOException,
24 InterruptedException;
View Code 代码 2.3
从上面的代码中可以知道InputFormat是一个抽象类,两面有两个抽象方法getSplit和createRecordReader,由于抽象类中只有方法的声明,并没有方法的实现,所以要分析该类的实现类FileInputFormat,在该实现类中,实现了他的父类InputFormat的getSplits()方法,查看该类的源码及注释如下代码2.4所示。
1 /**
2 * Generate the list of files and make them into FileSplits. 生成一个文件列表并创建FileSplits
3 */
4 public List getSplits(JobContext job
5 ) throws IOException {
6 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //该值等于1
7 long maxSize = getMaxSplitSize(job); //该值等于263-1
8 // generate splits
9 List splits = new ArrayList();
10 List files = listStatus(job); //读取文件夹下的所有文件
11 for (FileStatus file: files) { //遍历文件夹下的所有文件
12 Path path = file.getPath(); //获取文件路径
13 FileSystem fs = path.getFileSystem(job.getConfiguration()); //根据该路径获取文件系统
14 long length = file.getLen(); //文件长度
15 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); //块位置
16 if ((length != 0) && isSplitable(job, path)) { //判断文件数量是否不为空且文件允许被拆分
17 long blockSize = file.getBlockSize();
18 long splitSize = computeSplitSize(blockSize, minSize, maxSize); //计算分片大小,该分片大小和blockSize, minSize, maxSize有关系,默认为block块大小
19 long bytesRemaining = length; //文件初始长度
20 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //分片
21 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
22 splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
23 blkLocations[blkIndex].getHosts()));
24 bytesRemaining -= splitSize;
25 }
26
27 if (bytesRemaining != 0) {
28 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
29 blkLocations[blkLocations.length-1].getHosts()));
30 }
31 } else if (length != 0) { //如果该文件不能够被切分就,就直接生成分片
32 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
33 } else {
34 //Create empty hosts array for zero length files
35 splits.add(new FileSplit(path, 0, length, new String[0]));
36 }
37 }
38 // Save the number of input files in the job-conf
39 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
40 LOG.debug("Total # of splits: " + splits.size());
41 return splits;
42 }
View Code 代码 2.4
注意,分片FileinputSplit只是逻辑划分,并不是像划分block那样,将文件真是的划分为多个部分,他只是逻辑上的的划分,可以说是只是读取时候按分片来读取,分片InputSplit大小默认为块大小,为什么要这样呢?那因为MapReduce作业 处理的文件是存放在datanode上的,而且文件在DataNode上是按block存放的,如果分片大小大于block块大小,那么说明一个块满足不了该分片需要再读取一个block块,而不同的block可是存放在不同的DataNode上的,这样当这两个block块位于不同的DataNode上时,就要通过网络访问另一个节点,这样就可能造成网络延迟影响Mapre-duce的执行效率,所以一般分片大小会默认为block块大小。
我们知道FileInputFormat实现了,inputFormat的 getSplits()的抽象方法,那么另一个抽象方法createRecordReader由谁来实现呢,我们看一下该类的两个实现类FileIn putFormat和TextInputFormat这两个实现类的源码,看一发现createRecordReader是在TextInputFormat这个实现类中实现的,我们看一下该类的源码如下代码2.5所示。
1 /** An {@link InputFormat} for plain text files. Files are broken into lines. 文件被解析成行
2 * Either linefeed or carriage-return are used to signal end of line. Keys are 无论是换行符还是回车符都表示一行结束
3 * the position in the file, and values are the line of text.. */ 键是该行在文件中的位置,值为该行的内容
4 public class TextInputFormat extends FileInputFormat {
5
6 @Override
7 public RecordReader
8 createRecordReader(InputSplit split,
9 TaskAttemptContext context) {
10 return new LineRecordReader();
11 }
12
13 @Override
14 protected boolean isSplitable(JobContext context, Path file) {
15 CompressionCodec codec =
16 new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
17 if (null == codec) {
18 return true;
19 }
20 return codec instanceof SplittableCompressionCodec;
21 }
22
23 }
View Code 代码2.5
我们再分析一下createRecordReader()方法的返回值,他的返回值类型为RecordReader,返回值是new LineRecordReader (),而他继承了RecordReader,我们先看一下RecordReader源码如代码2.6所示。
1 package org.apache.hadoop.mapreduce;
2
3 import java.io.Closeable;
4 import java.io.IOException;
5
6 /**
7 * The record reader breaks the data into key/value pairs for input to the 将数据解析成Mapper能够处理的键值对
8 * {@link Mapper}.
9 * @param
10 * @param
11 */
12 public abstract class RecordReader implements Closeable {
13
14 /**
15 * Called once at initialization.
16 * @param split the split that defines the range of records to read
17 * @param context the information about the task
18 * @throws IOException
19 * @throws InterruptedException
20 */
21 public abstract void initialize(InputSplit split,
22 TaskAttemptContext context
23 ) throws IOException, InterruptedException;
24
25 /**
26 * Read the next key, value pair.
27 * @return true if a key/value pair was read
28 * @throws IOException
29 * @throws InterruptedException
30 */
31 public abstract boolean nextKeyValue() throws IOException, InterruptedException;
32
33 public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
34 public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
35 public abstract float getProgress() throws IOException, InterruptedException;
36 public abstract void close() throws IOException;
37 }
View Code 代码 2.6
从上面的代码中我们可以发现,RecordReader类是一个抽象类,其中的抽象方法initialize(),主要是用来将内容解析成键值对的,nextKeyValue(), getCurrentKey() ,getCurrentValue() 主要是用来获取键值对的内容的,他们的使用方法如下面代码2.7所示。
1 while(xxx.nextKeyValue()){
2 key=xxx.getCurrenKey();
3 value=xxx.getCurrentValue();
4 }
代码 2.7
从RecordReader的类中回到 LineRecordReader类我们可以看到,该类对RecordReader类的三个抽象方法nextKeyValue(), getCurrentKey(),getCurrentValue()进行了实现,LineRecordReader类源码如代码2.8所示。
1 public boolean nextKeyValue() throws IOException {
2 if (key == null) {
3 key = new LongWritable();
4 }
5 key.set(pos); //第一次调用时pos为零,key也就为零,key表示该行的偏移量
6 if (value == null) {
7 value = new Text();
8 }
9 int newSize = 0; //表示当前读取的字节数
10 // We always read one extra line, which lies outside the upper
11 // split limit i.e. (end - 1)
12 while (getFilePosition() |
|