设为首页 收藏本站
查看: 2130|回复: 0

[经验分享] [Hadoop源码详解]之一MapReduce篇之InputFormat

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-11 07:50:11 | 显示全部楼层 |阅读模式
个人小站,正在持续整理中,欢迎访问:http://shitouer.cn
  小站博文地址:[Hadoop源码详解]之一MapReduce篇之InputFormat
  

1. 概述
  我们在设置MapReduce输入格式的时候,会调用这样一条语句:

job.setInputFormatClass(KeyValueTextInputFormat.class);
  这条语句保证了输入文件会按照我们预设的格式被读取。KeyValueTextInputFormat即为我们设定的数据读取格式。
  所有的输入格式类都继承自InputFormat,这是一个抽象类。其子类有例如专门用于读取普通文件的FileInputFormat,还有用来读取数据库的DBInputFormat等等。相关类图简单画出如下(推荐新标签中打开图片查看):
DSC0000.png

2. InputFormat
  从InputFormat类图看,InputFormat抽象类仅有两个抽象方法:


  • List getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
  • RecordReader createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。
  在后面说到InputSplits的时候,会介绍在getSplits()时需要验证输入文件是否可分割、文件存储时分块的大小和文件大小等因素,所以总体来说,通过InputFormat,Mapreduce框架可以做到:


  • 验证作业输入的正确性
  • 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask
  • 提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用
  InputFormat抽象类源码也很简单,如下供参考(文章格式考虑,删除了部分注释,添加了部分中文注释):

public abstract class InputFormat {
/**
* 仅仅是逻辑分片,并没有物理分片,所以每一个分片类似于这样一个元组
*/
public abstract List getSplits(JobContext context)
throws IOException, InterruptedException;
/**
* Create a record reader for a given split.
*/
public abstract RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException;
}
  不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片会被单独的map task作为数据源。下面详细介绍输入分片(inputSplit)是什么。

3.InputSplit
  Mappers的输入是一个一个的输入分片,称InputSplit。看源码可知,InputSplit也是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

public abstract class InputSplit {
/**
* 获取Split的大小,支持根据size对InputSplit排序.
*/
public abstract long getLength() throws IOException, InterruptedException;
/**
* 获取存储该分片的数据所在的节点位置.
*/
public abstract
String[] getLocations() throws IOException, InterruptedException;
}
  下面深入看一个InputSplit的子类:FileSplit类

public class FileSplit extends InputSplit implements Writable {
private Path file;
private long start;
private long length;
private String[] hosts;
/**
* Constructs a split with host information
*
* @param file
*            the file name
* @param start
*            the position of the first byte in the file to process
* @param length
*            the number of bytes in the file to process
* @param hosts
*            the list of hosts containing the block, possibly null
*/
public FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
/** The number of bytes in the file to process. */
@Override
public long getLength() {
return length;
}
@Override
public String[] getLocations() throws IOException {
if (this.hosts == null) {
return new String[] {};
} else {
return this.hosts;
}
}
// 略掉部分方法
}
  从源码中可以看出,FileSplit有四个属性:文件路径,分片起始位置,分片长度和存储分片的hosts。用这四项数据,就可以计算出提供给每个Mapper的分片数据。在InputFormat的getSplit()方法中构造分片,分片的四个属性会通过调用FileSplit的Constructor设置。
  再看一个InputSplit的子类:CombineFileSplit。源码如下:

public class CombineFileSplit extends InputSplit implements Writable {
private Path[] paths;
private long[] startoffset;
private long[] lengths;
private String[] locations;
private long totLength;
public CombineFileSplit(Path[] files, long[] start, long[] lengths,
String[] locations) {
initSplit(files, start, lengths, locations);
}
private void initSplit(Path[] files, long[] start, long[] lengths,
String[] locations) {
this.startoffset = start;
this.lengths = lengths;
this.paths = files;
this.totLength = 0;
this.locations = locations;
for (long length : lengths) {
totLength += length;
}
}
public long getLength() {
return totLength;
}
/** Returns all the Paths where this input-split resides */
public String[] getLocations() throws IOException {
return locations;
}
//省略了部分构造函数和方法,深入学习请阅读源文件
}
  为什么介绍该类呢,因为接下来要学习《Hadoop学习(五) – 小文件处理》,深入理解该类,将有助于该节学习。
  上面我们介绍的FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!
  CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。
  需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。
  现在,我们已深入的了解了InputSplit的概念,看了其源码,知道了其属性。我们知道数据分片是在InputFormat中实现的,接下来,我们就深入InputFormat的一个子类,FileInputFormat看看分片是如何进行的。

4. FileInputFormat
  FileInputFormat中,分片方法代码及详细注释如下,就不再详细解释该方法:

public List getSplits(JobContext job) throws IOException {
// 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小。
// 由源码可知,这两个值可以通过mapred.min.split.size和mapred.max.split.size来设置
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// splits链表用来存储计算得到的输入分片结果
List splits = new ArrayList();
// files链表存储由listStatus()获取的输入文件列表,listStatus比较特殊,我们在下面详细研究
List files = listStatus(job);
for (FileStatus file : files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
// 获取该文件所有的block信息列表[hostname, offset, length]
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
// 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割
// 是否分割可以自行重写FileInputFormat的isSplitable来控制
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// 计算分片大小
// 即 Math.max(minSize, Math.min(maxSize, blockSize));
// 也就是保证在minSize和maxSize之间,且如果minSize

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85315-1-1.html 上篇帖子: Hadoop第一天---初识Hadoop 下篇帖子: 初学Hadoop之图解MapReduce与WordCount示例分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表