|
MoveIntermediateToDoneRunnable.run(){
hsManager.scanIntermediateDirectory();
}
HistoryFileManager.scanIntermediateDirectory{
//获取done_intermediate下的子目录
List userDirList = JobHistoryUtils.localGlobber(
intermediateDoneDirFc, intermediateDoneDirPath, "");
//遍历user子目录
for (FileStatus userDir : userDirList) {
String name = userDir.getPath().getName();
UserLogDir dir = userDirModificationTimeMap.get(name);
if(dir == null) {
dir = new UserLogDir();
UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
if(old != null) {
dir = old;
}
}
//扫描user子目录
dir.scanIfNeeded(userDir);
}
}
HistoryFileManager$UserLogDir.scanIfNeeded(FileStatus fs) {
long newModTime = fs.getModificationTime();
//如果目录的修改时间和之前的不一致,则扫描此目录下的文件
if (modTime != newModTime) {
Path p = fs.getPath();
try {
scanIntermediateDirectory(p);
modTime = newModTime;
} catch (IOException e) {
LOG.error("Error while trying to scan the directory " + p, e);
}
}
}
HistoryFileManager.scanIntermediateDirectory(final Path absPath){
List fileStatusList = scanDirectoryForHistoryFiles(absPath,
intermediateDoneDirFc);
for (FileStatus fs : fileStatusList) {
//根据-拆解.jhist文件的文件名,设置属性jobIndexInfo的属性(user,jobName等)
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
//JobID_conf.xml
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
//JobID.summary(MRv1中summary信息写入jt.log中)
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
//实例化HistoryFileInfo,state为IN_INTERMEDIATE
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, false);
final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
if (old == null || old.didMoveFail()) {
final HistoryFileInfo found = (old == null) ? fileInfo : old;
long cutoff = System.currentTimeMillis() - maxHistoryAge;
//超时需要删除的文件
if(found.getJobIndexInfo().getFinishTime() |
|
|