ditel 发表于 2015-11-11 13:37:44

【框架解析】Hadoop系统分析(五)--namenode其他

  upgrade/rollback/importCheckpoint


  在FsImage.recoverTransitionRead方法中,针对upgrade/rollback/importCheckpoint参数,在启动前做了特殊的操作,代码如下:
  

switch(startOpt) {
case UPGRADE:
doUpgrade();
return false; // upgrade saved image already
case IMPORT:
doImportCheckpoint();
return true;
case ROLLBACK:
doRollback();
break;
case REGULAR:
// just load the image
}
  


[*]hadoop namenode -upgrade

进行hadoop namenode的升级操作。调用FsImage.doUpgrade方法,主要操作是将current保存为previous,升级后的fsimage保存为current。

if(getDistributedUpgradeState()) {
// only distributed upgrade need to continue
// don't do version upgrade
this.loadFSImage();
initializeDistributedUpgrade();
return;
}
// Upgrade is allowed only if there are
// no previous fs states in any of the directories
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
//如果已经有previous目录,则无法重新upgrade
if (sd.getPreviousDir().exists())
throw new InconsistentFSStateException(sd.getRoot(),
&quot;previous fs state should not exist during upgrade. &quot;
+ &quot;Finalize or rollback first.&quot;);
}
// 把硬盘上最近的fsimage加载进内存
this.loadFSImage();
// Do upgrade for each directory
long oldCTime = this.getCTime();
this.cTime = FSNamesystem.now();// generate new cTime for the state
int oldLV = this.getLayoutVersion();
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.checkpointTime = FSNamesystem.now();
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
LOG.info(&quot;Upgrading image directory &quot; + sd.getRoot()
+ &quot;.\n   old LV = &quot; + oldLV
+ &quot;; old CTime = &quot; + oldCTime
+ &quot;.\n   new LV = &quot; + this.getLayoutVersion()
+ &quot;; new CTime = &quot; + this.getCTime());
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
assert curDir.exists() : &quot;Current directory must exist.&quot;;
assert !prevDir.exists() : &quot;prvious directory must not exist.&quot;;
assert !tmpDir.exists() : &quot;prvious.tmp directory must not exist.&quot;;
// 把current目录改为previous.tmp目录
rename(curDir, tmpDir);
// 把内存中的fsimage保存到current中
saveCurrent(sd);
// 把previous.tmp目录修改为previous目录
rename(tmpDir, prevDir);
//标记升级没有完成(升级成功需要进行finalize,升级失败需要进行rollback来完成升级)
isUpgradeFinalized = false;
LOG.info(&quot;Upgrade of &quot; + sd.getRoot() + &quot; is complete.&quot;);
}
//初始化升级后操作,主要是计算version写VERSION文件
initializeDistributedUpgrade();
//开启editlog
editLog.open();
[*]hadoop namenode -rollback

升级hadoop失败后回滚,主要是把upgrade时变更的数据目录恢复原状,将current修改为removed.tmp,previous修改回current,最后删除removed.tmp

// Rollback is allowed only if there is
// a previous fs states in at least one of the storage directories.
// Directories that don't have previous state do not rollback
boolean canRollback = false;
FSImage prevState = new FSImage();
prevState.layoutVersion = FSConstants.LAYOUT_VERSION;
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
//将previous目录里的版本信息加载进内存,如果没有previous目录,则加载current目录
StorageDirectory sd = it.next();
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) {// use current directory then
LOG.info(&quot;Storage directory &quot; + sd.getRoot()
+ &quot; does not contain previous fs state.&quot;);
sd.read(); // read and verify consistency with other directories
continue;
}
StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot());
sdPrev.read(sdPrev.getPreviousVersionFile());// read and verify consistency of the prev dir
canRollback = true;
}
if (!canRollback)
throw new IOException(&quot;Cannot rollback. &quot;
+ &quot;None of the storage directories contain previous fs state.&quot;);
// Now that we know all directories are going to be consistent
// Do rollback for each directory containing previous state
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
File prevDir = sd.getPreviousDir();
if (!prevDir.exists())
continue;
LOG.info(&quot;Rolling back storage directory &quot; + sd.getRoot()
+ &quot;.\n   new LV = &quot; + prevState.getLayoutVersion()
+ &quot;; new CTime = &quot; + prevState.getCTime());
File tmpDir = sd.getRemovedTmp();
assert !tmpDir.exists() : &quot;removed.tmp directory must not exist.&quot;;
// rename current to tmp
File curDir = sd.getCurrentDir();
assert curDir.exists() : &quot;Current directory must exist.&quot;;
//将current目录更改为removed.tmp
rename(curDir, tmpDir);
// rename previous to current
//将previous更改为current
rename(prevDir, curDir);
// delete tmp dir
//删除removed.tmp
deleteDir(tmpDir);
LOG.info(&quot;Rollback of &quot; + sd.getRoot()+ &quot; is complete.&quot;);
}
isUpgradeFinalized = true;
// check whether name-node can start in regular mode
verifyDistributedUpgradeProgress(StartupOption.REGULAR);
[*]hadoop namenode -importCheckpoint

使用备份的checkpoint启动namenode,之后将加载进来的ckp保存为current

FSImage ckptImage = new FSImage();
FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
// replace real image with the checkpoint image
FSImage realImage = fsNamesys.getFSImage();
assert realImage == this;
fsNamesys.dir.fsImage = ckptImage;
// load from the checkpoint dirs
try {
//读取checkpointDirs目录与checkpointEditsDirs目录,合并后放入内存
//checkpointDirs值为fs.checkpoint.dir配置项,如果没有则为null,secondarynamenode上默认值为/tmp/hadoop/dfs/namesecondary
//checkpointEditsDirs值为fs.checkpoint.edits.dir配置项,如果没有则为null,secondarynamenode上默认值为/tmp/hadoop/dfs/namesecondary
ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs,
StartupOption.REGULAR);
} finally {
ckptImage.close();
}
// return back the real image
realImage.setStorageInfo(ckptImage);
fsNamesys.dir.fsImage = realImage;
// and save it 保存fsimage
saveNamespace(false);

hadoop namenode -finalize

升级完成后,使用hadoop namenode -finalize来进行确认,finalize之后就不能rollback了

回到NameNode.createNameNode方法中,进入FINALIZE分支

case FINALIZE:
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);之后进入FSImage.doFinalize进行完成升级,主要工作就是删除previous目录,标记升级结束

File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // already discarded
LOG.info(&quot;Directory &quot; + prevDir + &quot; does not exist.&quot;);
LOG.info(&quot;Finalize upgrade for &quot; + sd.getRoot()+ &quot; is not required.&quot;);
return;
}
LOG.info(&quot;Finalizing upgrade for storage directory &quot;
+ sd.getRoot() + &quot;.&quot;
+ (getLayoutVersion()==0 ? &quot;&quot; :
&quot;\n   cur LV = &quot; + this.getLayoutVersion()
+ &quot;; cur CTime = &quot; + this.getCTime()));
assert sd.getCurrentDir().exists() : &quot;Current directory must exist.&quot;;
final File tmpDir = sd.getFinalizedTmp();
// rename previous to tmp and remove
//将previous修改为finalized.tmp
rename(prevDir, tmpDir);
//删除finalized.tmp
deleteDir(tmpDir);
isUpgradeFinalized = true;
LOG.info(&quot;Finalize upgrade for &quot; + sd.getRoot()+ &quot; is complete.&quot;);

版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: 【框架解析】Hadoop系统分析(五)--namenode其他