设为首页 收藏本站
查看: 849|回复: 0

[经验分享] Hadoop源代码分析(四零)

[复制链接]

尚未签到

发表于 2016-12-12 06:36:37 | 显示全部楼层 |阅读模式
  有了上面的基础,我们可以来解剖DFSOutputStream了。先看构造函数:
    private DFSOutputStream(String src, long blockSize, Progressable progress,

        int bytesPerChecksum) throws IOException


    DFSOutputStream(String src, FsPermission masked, boolean overwrite,

        short replication, long blockSize, Progressable progress,

        int buffersize, int bytesPerChecksum) throws IOException


    DFSOutputStream(String src, int buffersize, Progressable progress,

        LocatedBlock lastBlock, FileStatus stat,

        int bytesPerChecksum) throws IOException {

  这些构造函数的参数主要有:文件名src;进度回调函数progress(预留接口,目前未使用);数据块大小blockSize;Block副本数replication;每个校验chunk的大小bytesPerChecksum;文件权限masked;是否覆盖原文件标记overwrite;文件状态信息stat;文件的最后一个Block信息lastBlock;buffersize(?未见引用)。
  后面两个构造函数会调用第一个构造函数,这个函数会调用父类的构造函数,并设置对象的src,blockSize,progress和checksum属性。
  第二个构造函数会调用namenode.create方法,在文件空间中建立文件,并启动DataStreamer,它被DFSClient的create方法调用。第三个构造函数被DFSClient的append方法调用,显然,这种情况比价复杂,文件拥有一些数据块,添加数据往往添加在最后的数据块上。同时,append方法调用时,Client已经知道了最后一个Block的信息和文件的一些信息,如FileStatus中包含的Block大小,文件权限位等等。结合这些信息,构造函数需要计算并设置一些对象成员变量的值,并试图从可能的错误中恢复(调用processDatanodeError),最后启动DataStreamer。
  我们先看正常流程,前面已经分析过,通过FSOutputSummer,HDFS客户端能将流转换成package,这个包是通过writeChunk,发送出去的,下面是它们的调用关系。
  


  
  在检查完一系列的状态以后,writeChunk先等待,直到dataQueue中未发送的包小于门限值。如果现在没有可用的Packet对象,则创建一个Packet对象,往Packet中写数据,包括校验值和数据。如果数据包被写满,那么,将它放入发送队列dataQueue中。writeChunk的过程比较简单,这里的写入,也只是把数据写到本地队列,等待DataStreamer发送,没有实际写到DataNode上。
  createBlockOutputStream用于建立到第一个DataNode的连接,它的声明如下:
private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,

                    boolean recoveryFlag)

  nodes是所有接收数据的DataNode列表,client就是客户端名称,recoveryFlag指示是否是为错误恢复建立的连接。createBlockOutputStream很简单,打开到第一个DataNode的连接,然后发送下面格式的数据包,并等待来自DataNode的Ack。如果出错,记录出错的DataNode在nodes中的位置,设置errorIndex并返回false。
  

  当recoveryFlag指示为真时,意味着这次写是一次恢复操作,对于DataNode来说,这意味着为写准备的临时文件(在tmp目录中)可能已经存在,需要进行一些特殊处理,具体请看FSDataset的实现。
  当Client写数据需要一个新的Block的时候,可以调用nextBlockOutputStream方法。
    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException

  这个方法的实现很简单,首先调用locateFollowingBlock(包含了重试和出错处理),通过namenode.addBlock获取一个新的数据块,返回的是DatanodeInfo列表,有了这个列表,就可以建立写数据的pipe了。下一个大动作就是调用上面的createBlockOutputStream,建立到DataNode的连接了。
  有了上面的准备,我们来分析processDatanodeError,它的主要流程是:
l           参数检查;
l           关闭可能还打开着的blockStream和blockReplyStream;
l           将未收到应答的数据块(在ackQueue中)挪到dataQueue中;
l           循环执行:
1.      计算目前还活着的DataNode列表;
2.      选择一个主DataNode,通过DataNode RPC的recoverBlock方法启动它上面的恢复过程;
3.      处理可能的出错;
4.      处理恢复后Block可能的变化(如Stamp变化);
5.      调用createBlockOutputStream到DataNode的连接。
l           启动ResponseProcessor。
  这个过程涉及了DataNode上的recoverBlock方法和createBlockOutputStream中可能的Block恢复,是一个相当耗资源的方法,当系统出错的概率比较小,而且数据块上能恢复的数据很多(平均32M),还是值得这样做的。
  写的流程就分析到着,接下来我们来看流的关闭,这个过程也涉及了一系列的方法,它们的调用关系如下:
  


  flushInternal会一直等待到发送队列(包括可能的currentPacket)和应答队列都为空,这意味着数据都被DataNode顺利接收。
  sync作用和UNIX的sync类似,将写入数据持久化。它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。然后调用namenode.fsync,持久化命名空间上的数据。
  closeInternal比较复杂一点,它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。接着结束两个工作线程,关闭socket,最后调用amenode.complete,通知NameNode结束一次写操作。close方法先调用closeInternal,然后再本地的leasechecker中移除对应的信息。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312815-1-1.html 上篇帖子: Hadoop源代码分析(三五) 下篇帖子: Hadoop源代码分析(四一)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表