|
public abstract class FileInputFormat extends InputFormat {
// Generate the list of files and make them into FileSplits
public List getSplits(JobContext job) throws IOException {
// 1. 通过JobContext中获取List;
// 2. 遍历文件属性数据
// 2.1. 如果是空文件,则初始化一个无主机信息的FileSplits实例;
// 2.2. 非空文件,判断是否分片,默认是分片的
// 如果不分片则每个文件作为一个FileSplit
// 计算分片大小splitSize
// getFormatMinSplitSize()返回固定值1
// getMinSplitSize(job)通过Configuration获取,配置参数为(mapred-default.xml):
// mapreduce.input.fileinputformat.split.minsize默认值为0
// minSize的值为1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// 实际调用context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
// 通过Configuration获取,配置参数为(mapred-default.xml无该参数):
// mapreduce.input.fileinputformat.split.maxsize
// 未配置该参数,取Long.MAX_VALUE,maxSize的值为Long.MAX_VALUE
long maxSize = getMaxSplitSize(job);
// generate splits
List splits = new ArrayList();
List files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath(); // 在HDFS上的绝对路径
long length = file.getLen(); // 文件的实际大小
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
// 这里取的是Block块的大小,在2.6里面默认是134217728(即128M)
long blockSize = file.getBlockSize();
// 获取切片大小,computeSplitSize(blockSize, minSize, maxSize)实际调用:
// 1 Long.MAX_VALUE 128M
// Math.max(minSize, Math.min(maxSize, blockSize));
// split的大小刚好等于block块的大小,为128M
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length; // 取文件的实际大小
// 如果文件的实际大小/splitSize > 1.1(即实际大小大于128M * 1.1)
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// getBlockIndex判断is the offset inside this block?
// 第一次length-bytesRemaining的值为0,取block块的第一个复本
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize; // 依次减去分片的大小,对剩余长度再次分片
}
/**
* 加入有一个300M的文件,设置bytesRemaining = length = 300M;
* 1、判定bytesRemaining / splitSize = 300 / 128 > 1.1
* makeSplie-->FileSplit(path, length - bytesRemaining = 0, splitSize=128M)
* bytesRemaining -= splitSize => bytesRemaining = 172M
* 2、判定bytesRemaining / splitSize = 172 / 128 > 1.1
* makeSplie-->FileSplit(path, length - bytesRemaining = 128, splitSize=128M)
* bytesRemaining -= splitSize => bytesRemaining = 44M
* 3、判定bytesRemaining / splitSize = 44 / 128 < 1.1
* while循环结束。
*/
// 多次分片后,最后的数据长度仍不为0但又不足一个分片大小
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
// 在这里把最后的44M又make了一个分片
// makeSplie-->FileSplit(path, length - bytesRemaining = 256, splitSize=44)
}
} else { // not splitable,就取实际大小
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
return splits;
}
}
|
|
|