设为首页 收藏本站
查看: 679|回复: 0

[经验分享] Hadoop源代码分析(三三)

[复制链接]

尚未签到

发表于 2016-12-12 06:34:04 | 显示全部楼层 |阅读模式
  下面来看一个大家伙:
public DatanodeCommand sendHeartbeat(DatanodeRegistration nodeReg,

                                       long capacity,

                                       long dfsUsed,

                                       long remaining,

                                       int xmitsInProgress,

                                       int xceiverCount) throws IOException

  DataNode发送到NameNode的心跳信息。细心的人会发现,请求的内容还是DatanodeRegistration,应答换成DatanodeCommand了。DatanodeCommand类图如下:
  前面介绍DataNode时,已经分析过了DatanodeCommand支持的命令:
  DNA_TRANSFER:拷贝数据块到其他DataNode

  DNA_INVALIDATE:删除数据块

  DNA_SHUTDOWN:关闭DataNode

  DNA_REGISTER:DataNode重新注册

  DNA_FINALIZE:提交升级

  DNA_RECOVERBLOCK:恢复数据块

 

  

 
  有了上面这些基础,我们来看FSNamesystem.handleHeartbeat的处理过程:
l           调用getDatanode方法找对应的DatanodeDescriptor,保存于变量nodeinfo(可能为null)中,如果现有NameNode上记录的StorageID和请求的不一样,返回DatanodeCommand.REGISTER,让DataNode从新注册。
l           如果发现当前节点需要关闭(已经isDecommissioned),抛异常DisallowedDatanodeException。
l           nodeinfo是空或者现在状态不是活的,返回DatanodeCommand.REGISTER,让DataNode从新注册。
l           更新系统的状态,包括capacityTotal,capacityUsed,capacityRemaining和totalLoad;
l           接下来按顺序看有没有可能的恢复数据块/拷贝数据块到其他DataNode/删除数据块/升级命令(不讨论)。一次返回只能有一条命令,按上面优先顺序。
  下面分析应答的命令是如何构造的。
  首先是DNA_RECOVERBLOCK(恢复数据块),那是个非常长的流程,同时需要回去讨论DataNode上的一些功能,我们在后面介绍它。
  对于DNA_TRANSFER(拷贝数据块到其他DataNode),从DatanodeDescriptor.replicateBlocks中取出尽可能多的项目,放到BlockCommand中。在DataNode中,命令由transferBlocks执行,前面我们已经分析过啦。
  删除数据块DNA_INVALIDATE也很简单,从DatanodeDescriptor.invalidateBlocks中获取尽可能多的项目,放到BlockCommand中,DataNode中的动作,我们也分析过。
  我们来讨论DNA_RECOVERBLOCK(恢复数据块),在讨论DataNode的过程中,我们没有讲这个命令是用来干什么的,还有它在DataNode上的处理流程,是好好分析分析这个流程的时候了。DNA_RECOVERBLOCK命令通过DatanodeDescriptor.getLeaseRecoveryCommand获取,获取过程很简单,将DatanodeDescriptor对象中队列recoverBlocks的所有内容取出,放入BlockCommand的Block中,设置BlockCommand为DNA_RECOVERBLOCK,就OK了。
  关键是,这个队列里的信息是用来干什么的。我们先来看那些操作会向这个队列加东西,调用关系图如下:
  

 
  租约有两个超时时间,一个被称为软超时(1分钟),另一个是硬超时(1小时)。如果租约软超时,那么就会触发internalReleaseLease方法,如下:
    void internalReleaseLease(Lease lease, String src) throws IOException
  该方法执行:
l           检查src对应的INodeFile,如果不存在,不处于构造状态,返回;
l           文件处于构造状态,而文件目标DataNode为空,而且没有数据块,则finalize该文件(该过程在completeFileInternal中已经讨论过,租约在过程中被释放),并返回;
l           文件处于构造状态,而文件目标DataNode为空,数据块非空,则将最后一个数据块存放的DataNode目标取出(在BlocksMap中),然后设置为文件现在的目标DataNode;
l           调用INodeFileUnderConstruction.assignPrimaryDatanode,该过程会挑选一个目前还活着的DataNode,作为租约的主节点,并把<block,block目标DataNode数组>加到该DataNode的recoverBlocks队列中;
l           更新租约。
  上面分析了租约软超时的情况下NameNode发生租约恢复的过程。DataNode上收到这个命令后,将会启动一个新的线程,该线程为每个Block调用recoverBlock方法:recoverBlock(blocks, false, targets, true)。
  private LocatedBlock recoverBlock(Block block, boolean keepLength,

      DatanodeID[] datanodeids, boolean closeFile) throws IOException

  它的流程并不复杂,但是分支很多,如下图(蓝线是上面输入,没有异常走的流程):
  

 
  首先是判断进来的Block是否在ongoingRecovery中,如果存在,返回,不存在,加到ongoingRecovery中。
  
  接下来是个循环(框内部分是循环体,奇怪,没找到表示循环的符号),对每一个DataNode,获取Block的BlockMetaDataInfo(下面还会分析),这需要调用到DataNode间通信的接口上的方法getBlockMetaDataInfo。然后分情况看要不要把信息保存下来(图中间的几个判断),其中包括要进行同步的节点。
  根据参数,更新数据块信息,然后调用syncBlock并返回syncBlock生产的LocatedBlock。
  上面的这一圈,对于我们这个输入常数来说,就是把Block的长度,更新成为拥有最新时间戳的最小长度值,并得到要更新的节点列表,然后调用syncBlock更新各节点。
  getBlockMetaDataInfo用于获取Block的BlockMetaDataInfo,包括Block的generationStamp,最后校验时间,同时它还会检查数据块文件的元信息,如果出错,会抛出异常。
  syncBlock定义如下:
private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,

      boolean closeFile)

  它的流程是:
l           如果syncList为空,通过commitBlockSynchronization向NameNode提交这次恢复;
l           syncList不为空,那么先NameNode申请一个新的Stamp,并根据上面得到的长度,构造一个新的数据块信息newblock;
l           对于没一个syncList中的DataNode,调用它们上面的updateBlock,更新信息;更新信息如果返回OK,记录下来;
l           如果更新了信息的DataNode不为空,调用commitBlockSynchronization提交这次恢复;并生成LocatedBlock;
l           如果更新的DataNode为空,抛异常。
  通过syncBlock,所有需要恢复的DataNode上的Block信息都被更新。
  DataNode上的updateBlock方法我们前面已经介绍了,就不再分析。
  下面我们来看NameNode的commitBlockSynchronization方法,它在上面的过程中用于提交数据块恢复:
public void commitBlockSynchronization(Block block,

      long newgenerationstamp, long newlength,

      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets

      )

  参数分别是block,数据块;newgenerationstamp,新的时间戳;newlength,新长度;closeFile,是否关闭文件,deleteblock,是否删除文件;newtargets,新的目标列表。
  上面的两次调用,输入参数分别是:
  commitBlockSynchronization(block, 0, 0, closeFile, true, DatanodeID.EMPTY_ARRAY);
commitBlockSynchronization(block, newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false, nlist);

  处理流程是:
l           参数检查;
l           获取对应的文件,记为pendingFile;
l           BlocksMap中删除老的信息;
l           如果deleteblock为true,从pendingFile删除Block记录;
l           否则,更新Block的信息;
l           如果不关闭文件,那么写日志保存更新,返回;
l           关闭文件的话,调用finalizeINodeFileUnderConstruction。
  这块比较复杂,不仅涉及了NameNode和DataNode间的通信,而且还存在对于DataNode和DataNode间的通信(DataNode间的通信就只支持这两个方法,如下图)。后面介绍DFSClient的时候,我们还会再回来分析它的功能,以获取全面的理解。
  

 
  

  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312812-1-1.html 上篇帖子: Hadoop源代码分析(三二) 下篇帖子: Hadoop源代码分析(三四)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表