public abstract class InputFormat {
/**
* 仅仅是逻辑分片,并没有物理分片,所以每一个分片类似于这样一个元组
*/
public abstract List getSplits(JobContext context)
throws IOException, InterruptedException;
/**
* Create a record reader for a given split.
*/
public abstract RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException;
}
不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片会被单独的map task作为数据源。下面详细介绍输入分片(inputSplit)是什么。
public abstract class InputSplit {
/**
* 获取Split的大小,支持根据size对InputSplit排序.
*/
public abstract long getLength() throws IOException, InterruptedException;
/**
* 获取存储该分片的数据所在的节点位置.
*/
public abstract
String[] getLocations() throws IOException, InterruptedException;
}
下面深入看一个InputSplit的子类:FileSplit类
public class FileSplit extends InputSplit implements Writable {
private Path file;
private long start;
private long length;
private String[] hosts;
/**
* Constructs a split with host information
*
* @param file
* the file name
* @param start
* the position of the first byte in the file to process
* @param length
* the number of bytes in the file to process
* @param hosts
* the list of hosts containing the block, possibly null
*/
public FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
/** The number of bytes in the file to process. */
@Override
public long getLength() {
return length;
}
@Override
public String[] getLocations() throws IOException {
if (this.hosts == null) {
return new String[] {};
} else {
return this.hosts;
}
}
// 略掉部分方法
}
从源码中可以看出,FileSplit有四个属性:文件路径,分片起始位置,分片长度和存储分片的hosts。用这四项数据,就可以计算出提供给每个Mapper的分片数据。在InputFormat的getSplit()方法中构造分片,分片的四个属性会通过调用FileSplit的Constructor设置。
再看一个InputSplit的子类:CombineFileSplit。源码如下:
public class CombineFileSplit extends InputSplit implements Writable {
private Path[] paths;
private long[] startoffset;
private long[] lengths;
private String[] locations;
private long totLength;
public CombineFileSplit(Path[] files, long[] start, long[] lengths,
String[] locations) {
initSplit(files, start, lengths, locations);
}
private void initSplit(Path[] files, long[] start, long[] lengths,
String[] locations) {
this.startoffset = start;
this.lengths = lengths;
this.paths = files;
this.totLength = 0;
this.locations = locations;
for (long length : lengths) {
totLength += length;
}
}
public long getLength() {
return totLength;
}
/** Returns all the Paths where this input-split resides */
public String[] getLocations() throws IOException {
return locations;
}
//省略了部分构造函数和方法,深入学习请阅读源文件
}
为什么介绍该类呢,因为接下来要学习《Hadoop学习(五) – 小文件处理》,深入理解该类,将有助于该节学习。
上面我们介绍的FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!
CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。
需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。
现在,我们已深入的了解了InputSplit的概念,看了其源码,知道了其属性。我们知道数据分片是在InputFormat中实现的,接下来,我们就深入InputFormat的一个子类,FileInputFormat看看分片是如何进行的。