设为首页 收藏本站
查看: 939|回复: 0

[经验分享] hadoop分析之二元数据备份方案的机制

[复制链接]

尚未签到

发表于 2016-12-13 08:01:34 | 显示全部楼层 |阅读模式
1、NameNode启动加载元数据情景分析


  • NameNode函数里调用FSNamesystemm读取dfs.namenode.name.dir和dfs.namenode.edits.dir构建FSDirectory。
  • FSImage类recoverTransitionRead和saveNameSpace分别实现了元数据的检查、加载、内存合并和元数据的持久化存储。

  • saveNameSpace将元数据写入到磁盘,具体操作步骤:首先将current目录重命名为lastcheckpoint.tmp;然后在创建新的current目录,并保存文件;最后将lastcheckpoint.tmp重命名为privios.checkpoint.
  • checkPoint的过程:Secondary NameNode会通知nameNode产生一个edit log文件edits.new,之后所有的日志操作写入到edits.new文件中。接下来Secondary NameNode会从namenode下载fsimage和edits文件,进行合并产生新的fsimage.ckpt;然后Secondary会将fsimage.ckpt文件上传到namenode。最后namenode会重命名fsimage.ckpt为fsimage,edtis.new为edits;

2、元数据更新及日志写入情景分析



以mkdir为例:

DSC0000.png

logSync代码分析:

DSC0001.png

代码:

public void logSync () throws IOException {
ArrayList<EditLogOutputStream > errorStreams = null ;
long syncStart = 0;
// Fetch the transactionId of this thread.
long mytxid = myTransactionId .get (). txid;
EditLogOutputStream streams[] = null;
boolean sync = false;
try {
synchronized (this) {
assert editStreams. size() > 0 : "no editlog streams" ;
printStatistics (false);
// if somebody is already syncing, then wait
while (mytxid > synctxid && isSyncRunning) {
try {
wait (1000 );
} catch (InterruptedException ie ) {
}
}
//
// If this transaction was already flushed, then nothing to do
//
if (mytxid <= synctxid ) {
numTransactionsBatchedInSync ++;
if (metrics != null) // Metrics is non-null only when used inside name node
metrics .transactionsBatchedInSync .inc ();
return;
}
// now, this thread will do the sync
syncStart = txid ;
isSyncRunning = true;
sync = true;
// swap buffers
for( EditLogOutputStream eStream : editStreams ) {
eStream .setReadyToFlush ();
}
streams =
editStreams .toArray (new EditLogOutputStream[editStreams. size()]) ;
}
// do the sync
long start = FSNamesystem.now();
for (int idx = 0; idx < streams. length; idx++ ) {
EditLogOutputStream eStream = streams [idx ];
try {
eStream .flush ();
} catch (IOException ie ) {
FSNamesystem .LOG .error ("Unable to sync edit log." , ie );
//
// remember the streams that encountered an error.
//
if (errorStreams == null) {
errorStreams = new ArrayList <EditLogOutputStream >( 1) ;
}
errorStreams .add (eStream );
}
}
long elapsed = FSNamesystem.now() - start ;
processIOError (errorStreams , true);
if (metrics != null) // Metrics non-null only when used inside name node
metrics .syncs .inc (elapsed );
} finally {
synchronized (this) {
synctxid = syncStart ;
if (sync ) {
isSyncRunning = false;
}
this.notifyAll ();
}
}
}

3、Backup Node 的checkpoint的过程分析:


DSC0002.png

/**
* Create a new checkpoint
*/
void doCheckpoint() throws IOException {
long startTime = FSNamesystem.now ();
NamenodeCommand cmd =
getNamenode().startCheckpoint( backupNode. getRegistration());
CheckpointCommand cpCmd = null;
switch( cmd. getAction()) {
case NamenodeProtocol .ACT_SHUTDOWN :
shutdown() ;
throw new IOException ("Name-node " + backupNode .nnRpcAddress
+ " requested shutdown.");
case NamenodeProtocol .ACT_CHECKPOINT :
cpCmd = (CheckpointCommand )cmd ;
break;
default:
throw new IOException ("Unsupported NamenodeCommand: "+cmd.getAction()) ;
}
CheckpointSignature sig = cpCmd. getSignature();
assert FSConstants.LAYOUT_VERSION == sig .getLayoutVersion () :
"Signature should have current layout version. Expected: "
+ FSConstants.LAYOUT_VERSION + " actual " + sig. getLayoutVersion();
assert !backupNode .isRole (NamenodeRole .CHECKPOINT ) ||
cpCmd. isImageObsolete() : "checkpoint node should always download image.";
backupNode. setCheckpointState(CheckpointStates .UPLOAD_START );
if( cpCmd. isImageObsolete()) {
// First reset storage on disk and memory state
backupNode. resetNamespace();
downloadCheckpoint(sig);
}
BackupStorage bnImage = getFSImage() ;
bnImage. loadCheckpoint(sig);
sig.validateStorageInfo( bnImage) ;
bnImage. saveCheckpoint();
if( cpCmd. needToReturnImage())
uploadCheckpoint(sig);
getNamenode() .endCheckpoint (backupNode .getRegistration (), sig );
bnImage. convergeJournalSpool();
backupNode. setRegistration(); // keep registration up to date
if( backupNode. isRole( NamenodeRole.CHECKPOINT ))
getFSImage() .getEditLog (). close() ;
LOG. info( "Checkpoint completed in "
+ (FSNamesystem .now() - startTime )/ 1000 + " seconds."
+ " New Image Size: " + bnImage .getFsImageName (). length()) ;
}
}

4、元数据可靠性机制。



  • 配置多个备份路径。NameNode在更新日志或进行Checkpoint的过程,会将元数据放在多个目录下。
  • 对于没一个需要保存的元数据文件,都创建一个输出流,对访问过程中出现的异常输出流进行处理,将其移除。并再合适的时机再次检查移除的数据量是否恢复正常。有效的保证了备份输出流的异常问题。
  • 采用了多种机制来保证元数据的可靠性。例如在checkpoint的过程中,分为几个阶段,通过不同的文件名来标识当前所处的状态。为存储失败后进行恢复提供了可能。

5、元数据的一致性机制。


  • 首先从NameNode启动时,对每个备份目录是否格式化、目录元数据文件名是否正确等进行检查,确保元数据文件间的状态一致性,然后选取最新的加载到内存,这样可以确保HDFS当前状态和最后一次关闭时的状态一致性。
  • 其次,通过异常输出流的处理,可以确保正常输出流数据的一致性。
  • 运用同步机制,确保了输出流一致性问题。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313425-1-1.html 上篇帖子: Hadoop+Zookeeper+HBase分布式安装部署 下篇帖子: Hadoop 关于0.95/1.75 * (number of nodes)误解
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表