设为首页 收藏本站
查看: 1458|回复: 0

[经验分享] 第七章:小朱笔记hadoop之源码分析-hdfs分析 第九节:block Recovery过程分析

[复制链接]

尚未签到

发表于 2016-12-13 11:04:22 | 显示全部楼层 |阅读模式
第七章:小朱笔记hadoop之源码分析-hdfs分析

第九节:block Recovery过程分析

Lease Recovery Algorithm lease recovery算法:

1) Namenode retrieves lease information
name node查找到lease的信息
2) For each file f in the lease, consider the last block b off
对于lease中的每一个文件,获取其最后一个block b进行以下处理
2.1)  Get the datanodes which contains b
获取包含block b 的全部data node
2.2)  Assign one of the datanodes as the primary datanode p
获取一个data node作为其primary data node
2.3)  p obtains a new generation stamp form the namenode
primary datanode向 nn获取一个新的generation stamp
2.4)  p get the block info from each datanode
primary datanode 向每一个dn获取block info
2.5)  p computes the minimum block length
primary datanode 计算最小块的长度
2.6) p updates the datanodes, which have a valid generation stamp,with the new generation stamp and the minimum block length
primary datanode 用最小块的长度和新生成的 genetation stamp来更新 dn
2.7)  p acknowledges the namenode the update results
primary datanode  ack nn update的结果
2.8)  Namenode updates the BlockInfo
nn 更新 block info
2.9)  Namenode removes f from the lease and removes the lease once all files have been removed
nn 删除文件 f 的lease
2.10) Namenode commit changes to edit log
nn向edit log提交 lease 这个change
      一个client在持有某个文件的Lease情况下,如果写入数据过程中发生宕机,或者其他事故,导致无法继续对文件进行写入。由于该文件的Lease是由namenode来维护的,此时namenode认为该文件正在被该client持有,所以其他client对该文件是不允许进行写入的。
      为了解决上面的问题,namenode中对某个client对应某个文件的Lease是有一个限期的,一旦过了这 个限期,该Lease没有发生任何改变(比如更新时间),没有写入任何数据,那么namenode就认为该lease对应的client发生了异常,需要在 namenode端对这个Lease进行释放,以便其他的client能够对文件进行写入操作。释放Lease的时候会处理正在写入的文件,把该文件的最后一个block和targets datanode数组加入到需要recovery的队列中,进行处理之后等待目标datanode心跳获取该数据。
     datanode获取了需要recovery的block的数据,会遍历targets datanode进行recovery操作,recovery结束会按照最小块的长度进行截取、更新块信息和元信息。
 
第一步:namenode租约检查,准备recovery block 数据
LeaseManager.Monitor.checkLeases 关于租约过程有专门一节讲解。
(1)从namenode内存中找到该filePath对应的文件INode,通常这个时候该INode是一个INodeFileUnderConstruction的实例,表示这个文件是正在被写入,还没有complete的一个文件。
(2)如果该文件的Targets为空,且该文件的block队列为空,表示这个文件是个空文件,那么直接将该文件complete,删除其对应的lease记录,然后返回
(3)如果该文件的Targets为空,且该文件的block队列非空,那么获取该文件的block队列,并找到该队列的最后一个block,将该block的最后一个block对应的 datanode设置为该文件的targets,这个操作的原因在于:由于HDFS的文件只能向最后一个block写入输入,所以lease过期肯定是出 了最后一个block有问题外,其他block应该都是完整的,所以获取最后一个block。而targets表示最后一个block应该保存在哪几个 datanode上,该targets是一个datanode队列,也就是说,namenode知道这最后一个block是在这几台datanode 上,以便向这几个datanode发送block recovery命令
(4)在targets队列中选择一个datanode作为primary datanode,把该文件的最后一个block和targets datanode数组加入到primary datanode的recoverBlocks中。重点关注recoverBlocks这个队列
(5)修改Lease Holder为'NN_Recovery'并续组
 


//源码一:
if(pendingFile.getTargets()==null||pendingFile.getTargets().length== 0){
if (pendingFile.getBlocks().length == 0) {
finalizeINodeFileUnderConstruction(src, pendingFile);
return;
}
// setup the Inode.targets for the last block from the blocksMap
//最后一个block对应的datanode设置为该文件的targets
Block[] blocks = pendingFile.getBlocks();
Block last = blocks[blocks.length-1];
DatanodeDescriptor[] targets =  new DatanodeDescriptor[blocksMap.numNodes(last)];
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
for (int i = 0; it != null && it.hasNext(); i++) {
targets = it.next();
}
pendingFile.setTargets(targets);
}
//指定primary datanode,start lease recovery of the last block for this file.
pendingFile.assignPrimaryDatanode();
//再分配Lease 注意Holder NN_Recovery
Lease reassignedLease = reassignLease(lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);
//续约
leaseManager.renewLease(reassignedLease);
//源码2:
//将该INodeFileUnderConstruction文件分配给指定的客户端进程,也就是执行租约恢复的操作为该文件初始化租约的恢复的处理(存储选择的主Datanode所激活的块列表) ,该过程会挑选一个目前还活着的DataNode,作为租约的主节点,并把<block,block目标     DataNode数组>加到该DataNode的recoverBlocks队列中;
void assignPrimaryDatanode() {
//assign the first alive datanode as the primary datanode
// 指派第一个活跃的为主Datanode结点  
if (targets.length == 0) {
NameNode.stateChangeLog.warn("BLOCK*"
+ " INodeFileUnderConstruction.initLeaseRecovery:"
+ " No blocks found, lease removed.");
}
int previous = primaryNodeIndex;  
//primaryNodeIndex初始值是-1,它用来保证每次找到的primary不在同一个位置
// 从索引previous开始查找到一个活跃的Datanode进程  
for(int i = 1; i <= targets.length; i++) {  
int j = (previous + i)%targets.length;  
if (targets[j].isAlive) { // 保证第j个Datanode处于活跃状态  
DatanodeDescriptor primary = targets[primaryNodeIndex = j];  
//把该文件的最后一个block和targets datanode数组加入到
//primary datanode 的recoverBlocks中
primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
//存储被主Datanode激活的块,实际存储到该Datanode的块队列中  
NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1] + " recovery started, primary=" + primary);  
return;  
}  
}  
}

 
第二步:datanode心跳获取recovery block
     在 HDFS中,datanode是会每隔几秒钟向namenode定期的发送心跳,namenode会返回给datanode一个“命令集(cmds)”的,这个命令集就是namenode需要 datanode执行的某些操作,比如
    (1)将该datanode上的某个block拷贝到其他datanode上去的DNA_TRANSFER命令
    (2)将该datanode上的某个block从物理磁盘上删除的DNA_TRANSFER命令
    (3)停止该datanode的DNA_SHUTDOWN命令
    (4)对某个block进行block recovery的DNA_RECOVERBLOCK命令
         

cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
       从上可以看出,从heartbeat的返回命令集中,就包括了对某个block进行recovery的命令。所以,datanode某个block进行 recovery操作的动作,实际上是来自namenode的指令。也就是说,namenode认为这个block需要做recovery了,并且这个 block在某几个datanode上保存,那么namenode就会在这几个datanode的heartbeat发送过来后,给这几个 datanode返回指令集,指令集中就包括对这个block进行recovery的指令。于是datanode接受到这个指令后,对block进行数据 本身的recovery操作。
    注意: BlockCommand数据结构,它存储了需要执行的操作(action)以及这个操作涉及的block和这个block所对应的所有的datanode。
 

public class BlockCommand extends DatanodeCommand {
Block blocks[];
DatanodeInfo targets[][];
}
第三步:datanode执行 block recovery
datanode上最终会调用recoverBlock方法,此时closeFile=true。它是recoverLease发起的,此时要关闭文件,并使得这个文件的block在datanode上的信息一致。
 

Public Daemon recoverBlocks(final Block[] blocks,final DatanodeInfo[][]targets){
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(int i = 0; i < blocks.length; i++) {
try {
logRecoverBlock("NameNode", blocks, targets);
recoverBlock(blocks, false, targets, true);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks, e);
}
}
}
});
d.start();
return d;
}
 (1)由于block recovery 是由primary datanode发起,但该recovery操作需要在三个datanode上对该block进行操作(假设文件副本为3),所以primary datanode接收到命令的时候同时还收到了该block的targets datanode数组(其中就包括该datanode自身)
(2)primary datanode遍历targets datanode数组,对每一个datanode,向其发送一个start block recovery的指令。判断本地调用还是远程RPC,如果是其自身,则直接执行该指令。
(3)start block recovery指令会在datanode的磁盘中找到该block的物理块,并确认该block对应的验证信息和meta信息正确,并返回一个BlockRecord对象,表示这个block正在被recovery。
(4)对每个BlockRecord,查看keepLength标志位是否为true,如果为true,则只recovery blocksize 跟 namenode中记录的blocksize一致的block,否则全部都算。并且block的size为BlockRecord最小的。
(5)对每个物理块,一旦真正开始recovery操作,则进行如下操作:在该datanode上找到该block,同时找到这个block对应的meta文件 (每一个block都对应一个meta文件,用来记录该block的验证码等原信息),更新该block的stamp号(表示该block已经被修改过一 次)
 
(6)如果需要recovery成的block的size 小于实际的block的size,则将实际的block截断成其需要的大小,并更新meta文件和验证信息。
 
第四步:datanode处理recovery结果
primary datanode调用各个BlockRecord对应的datanode进行Block同步,然后向namenode提交块同步信息。

for(BlockRecord r : syncList) {
try {
r.datanode.updateBlock(r.info.getBlock(), newblock, closeFile);
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newblock + ", datanode=" + r.id + ")", e);
}
}
namenode.commitBlockSynchronization(block,newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,nlist);
 
(1)updateBlock 更新Block
  updateBlock的最外层是一个死循环,循环的结束条件,是没有任何和这个数据块相关的写线程。每次循环,updateBlock都会去调用一个叫tryUpdateBlock的内部方法。tryUpdateBlock发现已经没有线程在写这个块,就会跟新和这个数据块相关的信息,包括元文件和内存中的映射表volumeMap。如果tryUpdateBlock发现还有活跃的线程和该块关联,那么,updateBlock会试图结束该线程,并等在join上等待。

public void updateBlock(Block oldblock, Block newblock) throws IOException {
if (oldblock.getBlockId() != newblock.getBlockId()) {
throw new IOException("Cannot update oldblock (=" + oldblock
+ ") to newblock (=" + newblock + ").");
}

// Protect against a straggler updateblock call moving a block backwards
// in time.
boolean isValidUpdate =
(newblock.getGenerationStamp() > oldblock.getGenerationStamp()) ||
(newblock.getGenerationStamp() == oldblock.getGenerationStamp() &&
newblock.getNumBytes() == oldblock.getNumBytes());
if (!isValidUpdate) {
throw new IOException(
"Cannot update oldblock=" + oldblock +
" to newblock=" + newblock + " since generation stamps must " +
"increase, or else length must not change.");
}

for(;;) {
final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
if (threads == null) {
return;
}
interruptAndJoinThreads(threads);
}
}
 
 

  /**
* Try to update an old block to a new block.
* If there are ongoing create threads running for the old block,
* the threads will be returned without updating the block.
* 用于将旧块截断成新块(调用truncateBlock),并截断相应的元数据文件,以及更新ongoingCreates、volumeMap。
* @return ongoing create threads if there is any. Otherwise, return null.
*/
private synchronized List<Thread> tryUpdateBlock(Block oldblock, Block newblock) throws IOException {
//check ongoing create threads 获取与此块相关的文件及访问这个块的线程
ArrayList<Thread> activeThreads = getActiveThreads(oldblock);
if (activeThreads != null) {
return activeThreads; //如果近期有对此块进行操作,返回存活的操作线程
}
//近期无对此块操作的线程,就更新块  
////获得旧块的文件  
//No ongoing create threads is alive.  Update block.
File blockFile = findBlockFile(oldblock.getBlockId());
if (blockFile == null) {
throw new IOException("Block " + oldblock + " does not exist.");
}
File oldMetaFile = findMetaFile(blockFile);
long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
// First validate the update
//update generation stamp
//旧块stamp比新块stamp大,不合法  
if (oldgs > newblock.getGenerationStamp()) {
throw new IOException("Cannot update block (id=" + newblock.getBlockId()
+ ") generation stamp from " + oldgs
+ " to " + newblock.getGenerationStamp());
}
//update length
//新块的大小大于旧块的大小  
if (newblock.getNumBytes() > oldblock.getNumBytes()) {
throw new IOException("Cannot update block file (=" + blockFile
+ ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes());
}
// Now perform the update
//rename meta file to a tmp file
//旧块元数据文件重命名  
File tmpMetaFile = new File(oldMetaFile.getParent(),oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());
if (!oldMetaFile.renameTo(tmpMetaFile)){
throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
}
//新块的大小小于旧块的大小 截断旧块和旧块的元数据文件
if (newblock.getNumBytes() < oldblock.getNumBytes()) {
truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
}
//rename the tmp file to the new meta file (with new generation stamp)
File newMetaFile = getMetaFile(blockFile, newblock);
if (!tmpMetaFile.renameTo(newMetaFile)) {
throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
}
updateBlockMap(ongoingCreates, oldblock, newblock);
updateBlockMap(volumeMap, oldblock, newblock);
// paranoia! verify that the contents of the stored block
// matches the block file on disk.
validateBlockMetadata(newblock);
return null;
}
//truncateBlock对旧块blockFile和对应的元数据文件metaFile进行截断,截断后旧块长度为newlen(newlen<oldlen)。  
static void truncateBlock(File blockFile, File metaFile,long oldlen, long newlen) throws IOException {
if (newlen == oldlen) {
return;
}
if (newlen > oldlen) {
throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
+ ") to newlen (=" + newlen + ")");
}
if (newlen == 0) {
// Special case for truncating to 0 length, since there's no previous
// chunk.
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
try {
//truncate blockFile
blockRAF.setLength(newlen);   
} finally {
blockRAF.close();
}
//update metaFile
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
try {
metaRAF.setLength(BlockMetadataHeader.getHeaderSize());
} finally {
metaRAF.close();
}
return;
}
//由于只是对就块进行截断,所有新块的最后一个校验和字段可能在旧块中不一样,  
//所有setLength进行截断后,要读取最后一个校验和字段  
DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
int checksumsize = dcs.getChecksumSize();
int bpc = dcs.getBytesPerChecksum();
long newChunkCount = (newlen - 1)/bpc + 1;//校验和的段数  
long newmetalen = BlockMetadataHeader.getHeaderSize() + newChunkCount*checksumsize;//新的校验和文件的长度  
long lastchunkoffset = (newChunkCount - 1)*bpc;//最后一个校验和字段的偏移位置  
int lastchunksize = (int)(newlen - lastchunkoffset); //最后一个校验和的开始位置  
byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");//对旧块进行读取  
try {
//truncate blockFile
blockRAF.setLength(newlen);
//read last chunk
blockRAF.seek(lastchunkoffset);
blockRAF.readFully(b, 0, lastchunksize);
} finally {
blockRAF.close();
}
//compute checksum
dcs.update(b, 0, lastchunksize);
dcs.writeValue(b, 0, false);
//update metaFile
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
try {
metaRAF.setLength(newmetalen);
metaRAF.seek(newmetalen - checksumsize);
metaRAF.write(b, 0, checksumsize);
} finally {
metaRAF.close();
}
}
 
 
(2) commitBlockSynchronization
  参数分别是block,数据块;newgenerationstamp,新的时间戳;newlength,新长度;closeFile,是否关闭文件,deleteblock,是否删除文件;newtargets,新的目标列表。
  处理流程:
  参数检查,获取对应的文件,记为pendingFile;从BlocksMap中删除老的信息;如果deleteblock为true,从pendingFile删除Block记录;否则,更新Block的信息;如果不关闭文件,那么写日志保存更新,返回;最后如果关闭文件的话,调用finalizeINodeFileUnderConstruction。
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313707-1-1.html 上篇帖子: Kettle5.0.1配置CDH4 下篇帖子: hadoop2.6安装步骤
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表