随风飞世 发表于 2018-10-31 06:39:10

hadoop作业log存储方式及解析

MoveIntermediateToDoneRunnable.run(){  
      hsManager.scanIntermediateDirectory();
  
    }
  

  
    HistoryFileManager.scanIntermediateDirectory{
  
      //获取done_intermediate下的子目录
  
      List userDirList = JobHistoryUtils.localGlobber(
  
            intermediateDoneDirFc, intermediateDoneDirPath, "");
  
      //遍历user子目录
  
      for (FileStatus userDir : userDirList) {
  
          String name = userDir.getPath().getName();
  
          UserLogDir dir = userDirModificationTimeMap.get(name);
  
          if(dir == null) {
  
            dir = new UserLogDir();
  
            UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
  
            if(old != null) {
  
            dir = old;
  
            }
  
          }
  
          //扫描user子目录
  
          dir.scanIfNeeded(userDir);
  
      }
  
    }
  

  
    HistoryFileManager$UserLogDir.scanIfNeeded(FileStatus fs) {
  
      long newModTime = fs.getModificationTime();
  
      //如果目录的修改时间和之前的不一致,则扫描此目录下的文件
  
      if (modTime != newModTime) {
  
            Path p = fs.getPath();
  
            try {
  
            scanIntermediateDirectory(p);
  
            modTime = newModTime;
  
            } catch (IOException e) {
  
            LOG.error("Error while trying to scan the directory " + p, e);
  
            }
  
          }
  
    }
  

  
    HistoryFileManager.scanIntermediateDirectory(final Path absPath){
  
      List fileStatusList = scanDirectoryForHistoryFiles(absPath,
  
            intermediateDoneDirFc);
  
      for (FileStatus fs : fileStatusList) {
  
            //根据-拆解.jhist文件的文件名,设置属性jobIndexInfo的属性(user,jobName等)
  
            JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
  
                  .getName());
  
            //JobID_conf.xml
  
            String confFileName = JobHistoryUtils
  
                  .getIntermediateConfFileName(jobIndexInfo.getJobId());
  
            //JobID.summary(MRv1中summary信息写入jt.log中)
  
            String summaryFileName = JobHistoryUtils
  
                  .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
  
            //实例化HistoryFileInfo,state为IN_INTERMEDIATE
  
            HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
  
                  .getPath().getParent(), confFileName), new Path(fs.getPath()
  
                  .getParent(), summaryFileName), jobIndexInfo, false);
  
            final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
  
            if (old == null || old.didMoveFail()) {
  
                final HistoryFileInfo found = (old == null) ? fileInfo : old;
  
                long cutoff = System.currentTimeMillis() - maxHistoryAge;
  
                //超时需要删除的文件
  
                if(found.getJobIndexInfo().getFinishTime()
页: [1]
查看完整版本: hadoop作业log存储方式及解析