设为首页 收藏本站
查看: 843|回复: 0

[经验分享] hadoop DataNode实现分析

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-11 10:38:14 | 显示全部楼层 |阅读模式
  在前面说hadoop整体实现的时候, 说过DataNode的需要完成的首要任务是K-V存储。
DSC0000.png DSC0001.png                                  
  
  第二个功能是 完成和namenode 通信 ,这个通过IPC 心跳连接实现。此外还有和客户端 其它datanode之前的信息交换。
  第 三个功能是 完成和客户端还有其它节点的大规模通信,这个需要直接通过socket 协议实现。
  
  下面开始分析源代码,看看DataNode是如何实现这些功能的。
  
  分析代码采取自定向下的分析方式, 看看代码中做了啥,然后分析这些代码的作用。
  首先看Datanode实现的接口。
  
       
  •      
    public class DataNode extends Configured     
  •      
        implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,     
  •      
        Runnable, DataNodeMXBean {  
    它实现了 InterDatanodeProtocol, ClientDatanodeProtocol, 这两个重要接口。 作用和之前分析haoop IPC的时候提到过, 为了是客户端 和其它datanode节点远程调用本dataNode节点方法的时候,提供方法实际运行的对象。
  我们可以看到它并没有实现和datanode的接口,因为datanode是主动和nameNode联系,nameNode从来不会主动调用dataNode上的方法。
  在main 方法中主要 通过一系列调用创建了datanode对象。
  之后datanode的初始化工作主要由 startDataNode()来完成, 这是一个很复杂的方法,我们来一点一点的分析。
  
       
  •      
    void startDataNode(Configuration conf,     
  •      
                         AbstractList dataDirs, SecureResources resources     
  •      
                         ) throws IOException {     
  •      
        if(UserGroupInformation.isSecurityEnabled() && resources == null)     
  •      
          throw new RuntimeException("Cannot start secure cluster without " +     
  •      
                "privileged resources.");     
  •      
        // connect to name node     
  •      
        this.namenode = (DatanodeProtocol)     
  •      
          RPC.waitForProxy(DatanodeProtocol.class,     
  •      
                           DatanodeProtocol.versionID,     
  •      
                           nameNodeAddr,     
  •      
                           conf);     
  •      
    这个是通过反射获取同dataNode节点通信的代理对象     
  •      
        // get version and id info from the name-node     
  •      
        NamespaceInfo nsInfo = handshake(); //立刻与名字节点通信     
  •      
        StartupOption startOpt = getStartupOption(conf);     
  •      
        assert startOpt != null : "Startup option must be set.";     
  •      
          storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);     
  •      
          // adjust     
  •      
          this.dnRegistration.setStorageInfo(storage);     
  •      
          // initialize data node internal structure     
  •      
          this.data = new FSDataset(storage, conf);     
  •      
    // 创建数据存储KV 的对象 这个后面还要再细分析。     
  •      
        }     
  •      
        this.threadGroup = new ThreadGroup("dataXceiverServer");     
  •      
        this.dataXceiverServer = new Daemon(threadGroup,     
  •      
            new DataXceiverServer(ss, conf, this));     
  •      
        this.threadGroup.setDaemon(true); // 创建流接口服务器 DataXServer   这个需要后面再分析     
  •      
        ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),     
  •      
            conf.getInt("dfs.datanode.handler.count", 3), false, conf, //创建IPC服务器。     
  •      
            blockTokenSecretManager);     
  •    
  •      
      }  
    上面代码分析中我们留了两个之后还要分析的方法,现在来看一下。
  第一个是FsdataSet.
  我们需要考虑的问题是 hadoop以64M大小为单位作为一个文件的大小 存储在linux 文件系统 上。 当文件多了,就有一个效率问题,同一个文件夹下有过多的文件
  和文件目录过深都不利于检索速度(这个与linux文件系统inode结构有关,这里暂不讨论这个) 。所以我们这里要设计一个结构 需要创建文件夹 但文件夹目录不能过深。
  此外 hadoop 还考虑了一个优化问题,如果一个datanode节点上插有多块硬盘的话,怎么提高并行吞吐量。好,有了这些我们来看具体实现。
DSC0002.png
  一个FSdir对于着一个存储目录,一个FSVolume 对应着一个用户配置的数据目录(应该为一个磁盘最好) FsVolumeSet存储着所有的FSVolume对象。
  在FsDataSet中海油一个最重要的成员变量,volumeMap 就是这个成员变量存储了 每一个Block 和它对应的存储路径等信息。
  
       
  •      
    HashMap volumeMap = new HashMap();;  
    第二个是 DataXServer
  当往数据节点中填入数据或者数据节点之间做负载均衡的时候显然无法 使用Hdoop IPC 因为hadoop的IPC 在socket之上封装了方法的调用,如果在这之上封装一个大规模数据传输的方法,显然效率上不如直接用socket通信。
  
       
  •      
    ServerSocket ss;     
  •      
        if(secureResources == null) {     
  •      
          ss = (socketWriteTimeout > 0) ?     
  •      
            ServerSocketChannel.open().socket() : new ServerSocket();     
  •      
          Server.bind(ss, socAddr, 0);     
  •      
        } else {     
  •      
          ss = resources.getStreamingSocket();     
  •      
        }     
  •      
        ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);     
  •      
    //初始化处理类dataXceiverServer     
  •      
       this.threadGroup = new ThreadGroup("dataXceiverServer");     
  •      
       this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));     
  •      
       this.threadGroup.setDaemon(true); // auto destroy when empty  
    DataXceiverServer 是个线程 我们看一下它的ruan方法
  
       
  •      
    Socket s = ss.accept();     
  •      
            s.setTcpNoDelay(true);     
  •      
            new Daemon(datanode.threadGroup,     
  •      
                new DataXceiver(s, datanode, this)).start();  
    我们再看一下 DataXceiver的run方法
  
       
  •      
    public void run() {     
  •      
        DataInputStream in=null;     
  •      
        try {     
  •      
          in = new DataInputStream(     
  •      
              new BufferedInputStream(NetUtils.getInputStream(s),     
  •      
                                      SMALL_BUFFER_SIZE));     
  •      
          short version = in.readShort();     
  •      
          if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {     
  •      
            throw new IOException( "Version Mismatch" );     
  •      
          }     
  •      
          boolean local = s.getInetAddress().equals(s.getLocalAddress());     
  •      
          byte op = in.readByte();     
  •      
          // Make sure the xciver count is not exceeded     
  •      
          int curXceiverCount = datanode.getXceiverCount();     
  •      
          if (curXceiverCount > dataXceiverServer.maxXceiverCount) {     
  •      
            throw new IOException("xceiverCount " + curXceiverCount     
  •      
                                  + " exceeds the limit of concurrent xcievers "     
  •      
                                  + dataXceiverServer.maxXceiverCount);     
  •      
          }     
  •      
          long startTime = DataNode.now();     
  •      
          switch ( op ) {     
  •      
          case DataTransferProtocol.OP_READ_BLOCK:     
  •      
            readBlock( in );     
  •      
            datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);     
  •      
            if (local)     
  •      
              datanode.myMetrics.incrReadsFromLocalClient();     
  •      
            else     
  •      
              datanode.myMetrics.incrReadsFromRemoteClient();     
  •      
            break;     
  •      
          case DataTransferProtocol.OP_WRITE_BLOCK:     
  •      
            writeBlock( in );     
  •      
            datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);     
  •      
            if (local)     
  •      
              datanode.myMetrics.incrWritesFromLocalClient();     
  •      
            else     
  •      
              datanode.myMetrics.incrWritesFromRemoteClient();     
  •      
            break;     
  •      
          case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination     
  •      
            replaceBlock(in);     
  •      
            datanode.myMetrics.addReplaceBlockOp(DataNode.now() - startTime);     
  •      
            break;     
  •      
          case DataTransferProtocol.OP_COPY_BLOCK:     
  •      
                // for balancing purpose; send to a proxy source     
  •      
            copyBlock(in);     
  •      
            datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);     
  •      
            break;     
  •      
          case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block     
  •      
            getBlockChecksum(in);     
  •      
            datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime);     
  •      
            break;     
  •      
          default:     
  •      
            throw new IOException("Unknown opcode " + op + " in data stream");     
  •      
          }     
  •      
        } catch (Throwable t) {     
  •      
          LOG.error(datanode.dnRegistration + ":DataXceiver",t);     
  •      
        } finally {     
  •      
          LOG.debug(datanode.dnRegistration + ":Number of active connections is: "     
  •      
                                   + datanode.getXceiverCount());     
  •      
          IOUtils.closeStream(in);     
  •      
          IOUtils.closeSocket(s);     
  •      
          dataXceiverServer.childSockets.remove(s);     
  •      
        }     
  •      
      }  
    重点在这句
  
       
  •      
    byte op = in.readByte();  
    应该是根据流中的事先约定 来 第一个字节 来决定是

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85466-1-1.html 上篇帖子: 【Hadoop代码笔记】Hadoop作业提交之JobTracker接收作业提交 下篇帖子: CHD4B1(hadoop-0.23)实现NameNode HA安装配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表