设为首页 收藏本站
查看: 830|回复: 0

[经验分享] hadoop datanode源码分析

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-6-6 12:06:51 | 显示全部楼层 |阅读模式


    DataNode源代码分析:  
    1.简介:DataNode是文件存储的基本单元,它将Block存储在本地文件系统中,保存了Block的Meta-data,  
           同时周期性地将所有存在的Block信息发送给NameNode  
      
    2.main启动DataNode  
      
    2.1:shell脚本启动DataNode  
    |-->hadoop/bin/start-all.sh  
    |-->start-dfs.sh  
    |-->"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode $dataStartOpt  
      
    2.2:main()函数启动分析  
    |-->StringUtils.startupShutdownMessage(DataNode.class, args, LOG); |设置启动和关闭日志信息  
        |-->toStartupShutdownString()  
        |-->Runtime.getRuntime().addShutdownHook() |通过设置钩子,完成日志结束标志  
    |-->DataNode datanode = createDataNode(args, null);  |见2.3  
    |-->datanode.join();  |主线程等待datanode线程执行完成  
      
    2.3 createDataNode(args, null) |用于创建Datanode实例,并启动Datanode线程  
    |-->DataNode dn = instantiateDataNode(args, conf);   
    |-->runDatanodeDaemon(dn);   
      
    2.3.1 instantiateDataNode(args, conf) |实例化DataNode结点  
    |-->parseArguments(args, conf)  |根据args解析加载conf的参数值  
    |-->String[] dataDirs = conf.getStrings("dfs.data.dir");  |获取datanode的本地存储路径  
    |-->makeInstance(dataDirs, conf);   
      
    2.3.2 makeInstance(dataDirs, conf); |创建Datanode实例  
    |-->for (int i = 0; i < dataDirs.length; i++)   
        |-->dirs.add(data);  
    |-->return new DataNode(conf, dirs);  |返回DataNode实例  
      
    2.3.3 runDatanodeDaemon(dn);  |运行DataNode结点  
    |-->dn.register();  |向namenode发送注册信息,namenode会通过心跳机制传递命令给datanode  
    |-->dn.dataNodeThread = new Thread(dn, dnThreadName);  
    |-->dn.dataNodeThread.setDaemon(true);  
    |-->dn.dataNodeThread.start();  
      
      
    3.DataNode实例化,通过startDataNode(conf, dataDirs)进行实例化  
    |-->setMachineName  |设置machineName  
        |-->machineName = conf.get("slave.host.name");   
        |-->machineName = DNS.getDefaultHost()  
    |-->nameNodeAddr = NameNode.getAddress(conf);  |获取nameNode的地址信息  
    |-->setSocketout时间  
        |-->his.socketTimeout =  conf.getInt("dfs.socket.timeout",HdfsConstants.READ_TIMEOUT);  
        |-->this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",  
                                              HdfsConstants.WRITE_TIMEOUT);  
    |-->this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); |写包的大小,默认64K  
    |-->String address = NetUtils.getServerAddress(   |设置地址  
                     conf,  
                                    "dfs.datanode.bindAddress",   
                                    "dfs.datanode.port",  
                                    "dfs.datanode.address");)   
    |-->InetSocketAddress socAddr = NetUtils.createSocketAddr(address);  |创建本地socketaddress地址  
    |-->int tmpPort = socAddr.getPort();  |端口号  
    |-->storage = new DataStorage();      |DataStorage保存了存储相关的信息  
    |-->this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort); |构造一个注册器  
    |-->this.namenode = (DatanodeProtocol)  RPC.waitForProxy();  |通过动态代理生成namenode实例  
        |-->RPC.class中的getProxy()  
        |-->VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(  
                protocol.getClassLoader(), new Class[] { protocol },  
                new Invoker(addr, ticket, conf, factory));  
    |-->NamespaceInfo nsInfo = handshake();  |主要包含buildVersin和distributeUpgradeVersion,用于版本检验  
        |-->nsInfo = namenode.versionRequest();  
            |-->return namesystem.getNamespaceInfo();   
    |-->boolean simulatedFSDataset =         
            conf.getBoolean("dfs.datanode.simulateddatastorage", false);  
    |-->if (simulatedFSDataset) |判断一下是否是伪分布式,否则走正常判断,此处分析正常逻辑  
    |-->else   
        |-->storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);  
        |-->this.dnRegistration.setStorageInfo(storage); |将storage进行信息注册  
        |-->this.data = new FSDataset(storage, conf);    |根据storage和conf信息,生成FSDataset,用于数据块操作  
    |-->ServerSocket ss = (socketWriteTimeout > 0) ?   |初始化Socket服务器端,区分NIO和IO  
              ServerSocketChannel.open().socket() : new ServerSocket();  
    |-->Server.bind(ss, socAddr, 0);   
    |-->ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);  |设置接收的buffer缓存大小,默认64K  
    |-->selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),  
                                         tmpPort);  
    |-->this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer |初始化处理类dataXceiverServer  
                            (ss, conf, this));   
    |-->setInterval  |分别设置块状态信息间隔时间和心跳间隔时间  
        |-->blockReportInterval  
        |-->heartBeatInterval  
    |-->blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);   |blockScanner用于定时对文件块进行扫描  
    |-->this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort, |创建HttpServer,内部用jetty实现,用于页面监控  
            tmpInfoPort == 0, conf);  
    |-->ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),  |开启本地ipc服务,监听来自client和其它  
            conf.getInt("dfs.datanode.handler.count", 3), false, conf);              datanode结点的指令信息  
      
    4.Datanode线程运行 run()方法  
    |-->dataXceiverServer.start();  |启动dataXceiverServer服务器  
        |-->new Daemon(datanode.threadGroup,      |根据socket接送状态,启动DataXceiver,见4.1  
                new DataXceiver(s, datanode, this)).start();  
    |-->startDistributedUpgradeIfNeeded();      
    |-->offerService();    |与namenode完成心跳机制,并接受来自namenode的命令 ,见4.2  
      
    4.1 DataXceiver的run()方法  
    |-->in = new DataInputStream(      |获取来自namenode结点的流信息  
              new BufferedInputStream(NetUtils.getInputStream(s),   
                                      SMALL_BUFFER_SIZE));  
    |-->short version = in.readShort();  |读取版本信息  
    |-->boolean local = s.getInetAddress().equals(s.getLocalAddress()) |判断是否本地地址  
    |-->byte op = in.readByte(); |获取命令指令,主要有以下几种   
        |-->DataTransferProtocol.OP_READ_BLOCK     |读取block信息  
        |-->DataTransferProtocol.OP_WRITE_BLOCK:   |写block信息  
        |-->DataTransferProtocol.OP_READ_METADATA: |读取元数据信息  
        |-->DataTransferProtocol.OP_REPLACE_BLOCK  |替换块信息  
        |-->DataTransferProtocol.OP_COPY_BLOCK      |复制块信息  
        |-->DataTransferProtocol.OP_BLOCK_CHECKSUM |较验block信息  
      
    4.1.1 .OP_READ_BLOCK -->readBlock(DataInputStream in)  |读取数据块信息  
    |-->首先读取block描述信息  
        |-->long blockId = in.readLong();            
        |-->Block block = new Block( blockId, 0 , in.readLong());  
        |-->long startOffset = in.readLong();  
        |-->long length = in.readLong();  
        |-->String clientName = Text.readString(in); |Utf-9转码读取clientName信息  
    |-->创建输出流  
        |-->OutputStream baseStream = NetUtils.getOutputStream(s,   
            datanode.socketWriteTimeout);  
        |-->DataOutputStream out = new DataOutputStream(  
                     new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));  
    |-->blockSender = new BlockSender(block, startOffset, length,  
                true, true, false, datanode, clientTraceFmt);  
    |-->out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);   
    |-->long read = blockSender.sendBlock(out, baseStream, null);  |发送block数据  
    |-->if (blockSender.isBlockReadFully())  |如果读取的整个块信息,则需要校验块信息  
        |-->datanode.blockScanner.verifiedByClient(block);  
    |-->datanode.myMetrics.bytesRead.inc((int) read);  
    |-->datanode.myMetrics.blocksRead.inc();  
    |-->关闭相应流信息  
        |-->IOUtils.closeStream(out);  
        |-->IOUtils.closeStream(blockSender);      
            
    4.1.1.1 sendBlock(out, baseStream, null) |读取Block信息时,发送block数据流  
    |-->this.throttler = throttler;  |设置调节器,用于调节流速度与带宽的关系  
    |-->写头信息  
        |-->checksum.writeHeader(out);  
        |-->out.writeLong( offset );  
        |-->out.flush();  
    |-->设置packetSize大小  
        |-->int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;  |初始化设置,并根据流性质,设定大小  
    |-->ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);  
    |-->while (endOffset > offset)    |循环读,直到读取完成  
        |-->long len = sendChunks(pktBuf, maxChunksPerPacket,   
                                  streamForSendChunks);  
    |-->out.writeInt(0); 设置0为标志位,读取完成  
    |-->return totalRead;  
      
    4.1.1.2 sendChunks() 一共分为三个部分  
    |-->1:较验数据  
        |-->设置packet头信息  
            |-->pkt.putInt(packetLen)  
            |-->pkt.putLong(offset)  
            |-->pkt.putLong(seqno);  
            |-->pkt.put((byte)  
            |-->pkt.putInt(len);  
    |-->checksumIn.readFully(buf, checksumOff, checksumLen);  
    |-->2:读取流信息  
    |-->int dataOff = checksumOff + checksumLen;  
    |-->IOUtils.readFully(blockIn, buf, dataOff, len);  |从blockIn中读取block流信息  
    |-->for (int i=0; i<numChunks; i++)                 |针对每个checkSum的chunk块,进行较验  
        |-->checksum.update(buf, dOff, dLen);  
    |-->3:写流数据  
    |-->if (blockInPosition >= 0)  |如果blockPosition大于0,则为socketOutputSteam流  
        |-->SocketOutputStream sockOut = (SocketOutputStream)out;  
        |-->sockOut.write(buf, 0, dataOff);  
        |-->sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),   
                                    blockInPosition, len);  
      
    |-->else   
        |-->out.write(buf, 0, dataOff + len);  
    |-->throttler.throttle(packetLen);  |调节带宽与传输流  
    |-->return len;  |返回读取大小  
      
      
      
    4.1.1.3 writeBlock() |写block数据流,比读要复杂,涉及到与上下datanode节点的交互  
    1:读取头文件信息  
    |-->Block block = new Block(in.readLong(),   
            dataXceiverServer.estimateBlockSize, in.readLong());  
    |-->int pipelineSize = in.readInt();  
    |-->boolean isRecovery = in.readBoolean();  
    |-->String client = Text.readString(in)  
    |-->boolean hasSrcDataNode = in.readBoolean()  
    |-->srcDataNode.readFields(in);    |此时为发送命令的datanode节点,srcDataNode  
    |-->int numTargets = in.readInt(); |共需要传递的节点数,最后一个节点就是1  
    |-->DatanodeInfo targets[] = new DatanodeInfo[numTargets];   
    |-->for (int i = 0; i < targets.length; i++) |从流当中读取DatanodeInfo信息  
        |-->tmp.readFields(in);  
        |-->targets[i] = tmp;  
      
    2:创建输入、输出流,及socket端口  
    |-->mirrorOut = new DataOutputStream(   |创建下一节点的输出流  
                 new BufferedOutputStream(  
                             NetUtils.getOutputStream(mirrorSock, writeTimeout),  
                             SMALL_BUFFER_SIZE));   
    |-->mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));|创建下一节点的输入流  
    |-->replyOut = new DataOutputStream(    |响应上一节点的输出流  
                         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));   
    |-->Socket mirrorSock        |创建下一节点的端口号  
    |-->BlockReceiver blockReceiver  = new BlockReceiver(block, in, |创建block接收者,并写block数据  
              s.getRemoteSocketAddress().toString(),  
              s.getLocalSocketAddress().toString(),  
              isRecovery, client, srcDataNode, datanode);  
    3.数据传递  
    |-->mirrorNode = targets[0].getName();  
    |-->mirrorTarget = NetUtils.createSocketAddr(mirrorNode);  
    |-->mirrorSock = datanode.newSocket();  
    |-->NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);  |连接到下一节点datanode的客户端  
    |-->写下一节点输出流的版本等信息  
          |-->mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );  
              |--> mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );  
              |-->mirrorOut.writeLong( block.getBlockId() );  
              |-->mirrorOut.writeLong( block.getGenerationStamp() );  
              |-->mirrorOut.writeInt( pipelineSize );  
              |-->mirrorOut.writeBoolean( isRecovery );  
              |-->Text.writeString( mirrorOut, client );  
              |-->mirrorOut.writeBoolean(hasSrcDataNode);  
          |-->srcDataNode.write(mirrorOut);  |前提条件hasSrcDataNode  
          |-->mirrorOut.writeInt( targets.length - 1 );  
          |-->for ( int i = 1; i < targets.length; i++ )  
            |-->targets[i].write( mirrorOut );  
          |-->blockReceiver.writeChecksumHeader(mirrorOut); |写入检验头文件  
          |-->mirrorOut.flush();  
          |-->if (client.length() != 0)   
            |-->firstBadLink = Text.readString(mirrorIn);  |当为client端的时候,读取ack信息  
      
    4.接收block数据及发送miorror镜像  
    |-->blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,  
                                     mirrorAddr, null, targets.length);  
    |-->datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);  
    |-->datanode.blockScanner.addBlock(block);  
      
    4.1:接收block信息 receiveBlock()  
    |-->BlockMetadataHeader.writeHeader(checksumOut, checksum);  
    |-->responder = new Daemon(datanode.threadGroup,     
                                   new PacketResponder(this, block, mirrIn,   
                                                       replyOut, numTargets));  
    |-->responder.start();  
    |-->while (receivePacket() > 0) {}  |接收流数据,写磁盘,每一次writeBlock只写一次磁盘  
    |-->mirrorOut.writeInt(0);  
    |-->((PacketResponder)responder.getRunnable()).close();  
    |-->if (clientName.length() == 0)   
        |-->block.setNumBytes(offsetInBlock);  
        |-->datanode.data.finalizeBlock(block);  
      
      
    4.2:receivePacket()  |不断读packet数据至buf当中,循环至数据长度为o  
    |-->int payloadLen = readNextPacket();  |读取下一个packet,下述是处理和传输过程  
    |-->读取packet的头信息 ,然后回滚至最初位置  
        |-->buf.mark();  
        |-->buf.getInt()  
        |-->offsetInBlock = buf.getLong()  
        |-->long seqno = buf.getLong()  
        |-->lastPacketInBlock = (buf.get() != 0)  
        |-->int endOfHeader = buf.position();  |header头最后的位置  
        |-->buf.reset();  
    |-->setBlockPosition(offsetInBlock);  
    |-->写入下一DataNode节点镜像  
        |-->mirrorOut.write(buf.array(), buf.position(), buf.remaining());|整个Packet包往下传,  
                                            position和remaining确定包大小  
        |-->mirrorOut.flush();  |flush使之生效  
    |-->buf.position(endOfHeader);  |从文件头处开始处理  
    |-->int len = buf.getInt();     |获取data的长度初始值  
    |-->offsetInBlock += len;       |设置Block当中的offset值  
    |-->checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize  |获取checksumLen的长度  
    |-->int checksumOff = buf.position();  |此时bytebuffer的初始位置已经为真实的data数据  
    |-->int dataOff = checksumOff + checksumLen;  |data数据的存储右端值  
    |-->byte pktBuf[] = buf.array();  
    |-->buf.position(buf.limit());  |移到数据data的末尾  
    |-->verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff); |验证chunk信息  
    |-->out.write(pktBuf, dataOff, len); |数据写本地磁盘  
    |-->验证chunk是否为packet  
        |-->partialCrc.update(pktBuf, dataOff, len);  
        |-->checksumOut.write(pktBuf, checksumOff, checksumLen);  
    |-->flush();  
        |-->checksumOut.flush()  
        |-->out.flush()  
    |-->responder.getRunnable()).enqueue(seqno,lastPacketInBlock) |用responder返回packet包  
    |-->throttler.throttle(payloadLen);  
      
    |-->return payloadLen;  
      
      
    5.关闭流及端口  
    |-->IOUtils.closeStream(mirrorOut);  
    |-->IOUtils.closeStream(mirrorIn);  
    |-->IOUtils.closeStream(replyOut);  
    |-->IOUtils.closeSocket(mirrorSock);  
    |-->IOUtils.closeStream(blockReceiver);  



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-20283-1-1.html 上篇帖子: Hadoop学习总结之四:Map-Reduce的过程解析 下篇帖子: Namenode源代码分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表