设为首页 收藏本站
查看: 1018|回复: 0

[经验分享] Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析

[复制链接]

尚未签到

发表于 2018-10-30 07:33:10 | 显示全部楼层 |阅读模式
public abstract class FileInputFormat extends InputFormat {  
    // Generate the list of files and make them into FileSplits
  
    public List getSplits(JobContext job) throws IOException {
  
        // 1. 通过JobContext中获取List;
  
        // 2. 遍历文件属性数据
  
        //    2.1. 如果是空文件,则初始化一个无主机信息的FileSplits实例;
  
        //    2.2. 非空文件,判断是否分片,默认是分片的
  
        //         如果不分片则每个文件作为一个FileSplit
  
        //         计算分片大小splitSize
  

  
        // getFormatMinSplitSize()返回固定值1
  
        // getMinSplitSize(job)通过Configuration获取,配置参数为(mapred-default.xml):
  
        // mapreduce.input.fileinputformat.split.minsize默认值为0
  
        // minSize的值为1
  
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  
        // 实际调用context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
  
        // 通过Configuration获取,配置参数为(mapred-default.xml无该参数):
  
        // mapreduce.input.fileinputformat.split.maxsize
  
        // 未配置该参数,取Long.MAX_VALUE,maxSize的值为Long.MAX_VALUE
  
        long maxSize = getMaxSplitSize(job);
  

  
        // generate splits
  
        List splits = new ArrayList();
  
        List files = listStatus(job);
  
        for (FileStatus file: files) {
  
          Path path = file.getPath();     // 在HDFS上的绝对路径
  
          long length = file.getLen();    // 文件的实际大小
  
          if (length != 0) {
  
            BlockLocation[] blkLocations;
  
            if (file instanceof LocatedFileStatus) {
  
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  
            } else {
  
              FileSystem fs = path.getFileSystem(job.getConfiguration());
  
              blkLocations = fs.getFileBlockLocations(file, 0, length);
  
            }
  
            if (isSplitable(job, path)) {
  
              // 这里取的是Block块的大小,在2.6里面默认是134217728(即128M)
  
              long blockSize = file.getBlockSize();
  
              // 获取切片大小,computeSplitSize(blockSize, minSize, maxSize)实际调用:
  
              //          1                Long.MAX_VALUE   128M
  
              // Math.max(minSize, Math.min(maxSize,        blockSize));
  
              // split的大小刚好等于block块的大小,为128M
  
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  

  
              long bytesRemaining = length;   // 取文件的实际大小
  
              // 如果文件的实际大小/splitSize > 1.1(即实际大小大于128M * 1.1)
  
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  
                // getBlockIndex判断is the offset inside this block?
  
                // 第一次length-bytesRemaining的值为0,取block块的第一个复本
  
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  
                            blkLocations[blkIndex].getHosts(),
  
                            blkLocations[blkIndex].getCachedHosts()));
  
                bytesRemaining -= splitSize;    // 依次减去分片的大小,对剩余长度再次分片
  
              }
  

  
              /**
  
              * 加入有一个300M的文件,设置bytesRemaining = length = 300M;
  
              * 1、判定bytesRemaining / splitSize = 300 / 128 > 1.1
  
              *  makeSplie-->FileSplit(path, length - bytesRemaining = 0, splitSize=128M)
  
              *  bytesRemaining -= splitSize => bytesRemaining = 172M
  
              * 2、判定bytesRemaining / splitSize = 172 / 128 > 1.1
  
              *  makeSplie-->FileSplit(path, length - bytesRemaining = 128, splitSize=128M)
  
              *  bytesRemaining -= splitSize => bytesRemaining = 44M
  
              * 3、判定bytesRemaining / splitSize = 44 / 128 < 1.1
  
              *  while循环结束。
  
              */
  

  
              // 多次分片后,最后的数据长度仍不为0但又不足一个分片大小
  
              if (bytesRemaining != 0) {
  
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  
                           blkLocations[blkIndex].getHosts(),
  
                           blkLocations[blkIndex].getCachedHosts()));
  
                // 在这里把最后的44M又make了一个分片
  
                // makeSplie-->FileSplit(path, length - bytesRemaining = 256, splitSize=44)
  
              }
  
            } else { // not splitable,就取实际大小
  
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
  
                          blkLocations[0].getCachedHosts()));
  
            }
  
          } else {
  
            //Create empty hosts array for zero length files
  
            splits.add(makeSplit(path, 0, length, new String[0]));
  
          }
  
        }
  
        // Save the number of input files for metrics/loadgen
  
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  

  
        return splits;
  
    }
  
}



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-628183-1-1.html 上篇帖子: Hadoop2.6.0学习笔记(三)Hadoop序列化 下篇帖子: ganalia+php+nginx+rrd 监控hadoop
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表