设为首页 收藏本站
查看: 1102|回复: 0

[经验分享] Hadoop中DataNode与NameNode之间的心跳机制

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-6-6 12:08:10 | 显示全部楼层 |阅读模式
DataNode: 用于存储HDFS的数据,
public class DataNode extends Configured
    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {。。。}

1,实现了InterDatanodeProtocol, ClientDatanodeProtocol来供与客户端以及其他DataNode的通信,接受请求。
2,实现Runnable 接口

首先是来看下run()方法
  public void run() {
    LOG.info(dnRegistration + "In DataNode.run, data = " + data);


    // start dataXceiveServer
    dataXceiverServer.start();//启动    dataXceiverServer服务器
        
    while (shouldRun) {
      try {
        startDistributedUpgradeIfNeeded();//Hadoop文件系统升级监测
        offerService();//循环,主要是定时向NameNode发送心跳,响应并执行NameNode返回的命令

      } catch (Exception ex) {
        LOG.error("Exception: " + StringUtils.stringifyException(ex));
        if (shouldRun) {
          try {
            Thread.sleep(5000);
          } catch (InterruptedException ie) {
          }
        }
      }
    }
        
    LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
    shutdown();
  }

①,    dataXceiverServer.start();开启数据监听守护进程,有数据到来时,开启一个DataXceiver线程
DataXceiverServer.java

public void run() {
    while (datanode.shouldRun) {
      try {
        Socket s = ss.accept();
        s.setTcpNoDelay(true);
        new Daemon(datanode.threadGroup,
            new DataXceiver(s, datanode, this)).start();//启动DataXceiver
      } catch (SocketTimeoutException ignored) {
        // wake up to see if should continue to run
      } catch (IOException ie) {
        LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
                                + StringUtils.stringifyException(ie));
      } catch (Throwable te) {
        LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:"
                                 + StringUtils.stringifyException(te));
        datanode.shouldRun = false;
      }
    }
    try {
      ss.close();
    } catch (IOException ie) {
      LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
                              + StringUtils.stringifyException(ie));
    }
  }

在DataXceiver中主要是从DataXceiverServer读写数据
-------DataXceiver.java----------
  /**
   * Read/write data from/to the DataXceiveServer.
   */

  public void run() {
    DataInputStream in=null;
    try {
      in = new DataInputStream(
          new BufferedInputStream(NetUtils.getInputStream(s),
                                  SMALL_BUFFER_SIZE));//获取NameNode的流信息
      short version = in.readShort();//获取版本信息
      if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
        throw new IOException( "Version Mismatch" );
      }
      boolean local = s.getInetAddress().equals(s.getLocalAddress());
      byte op = in.readByte();
      // Make sure the xciver count is not exceeded
      int curXceiverCount = datanode.getXceiverCount();
      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
        throw new IOException("xceiverCount " + curXceiverCount
                              + " exceeds the limit of concurrent xcievers "
                              + dataXceiverServer.maxXceiverCount);
      }
      long startTime = DataNode.now();
//执行相应的操作
      switch ( op ) {
      case DataTransferProtocol.OP_READ_BLOCK://读取block信息
        readBlock( in );
        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.readsFromLocalClient.inc();
        else
          datanode.myMetrics.readsFromRemoteClient.inc();
        break;
      case DataTransferProtocol.OP_WRITE_BLOCK://写block信息
        writeBlock( in );
        datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.writesFromLocalClient.inc();
        else
          datanode.myMetrics.writesFromRemoteClient.inc();
        break;
      case DataTransferProtocol.OP_READ_METADATA://读取元数据信息
        readMetadata( in );
        datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
        replaceBlock(in);
        datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_COPY_BLOCK://复制block信息
            // for balancing purpose; send to a proxy source
        copyBlock(in);
        datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block---校验块信息
        getBlockChecksum(in);
        datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
        break;
      default:
        throw new IOException("Unknown opcode " + op + " in data stream");
      }
    } catch (Throwable t) {
      LOG.error(datanode.dnRegistration + ":DataXceiver",t);
    } finally {
      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
                               + datanode.getXceiverCount());
      IOUtils.closeStream(in);
      IOUtils.closeSocket(s);
      dataXceiverServer.childSockets.remove(s);
    }
  }

-------------------------------------------------------------------------------分割线-------------------------------------------------------------
       startDistributedUpgradeIfNeeded();
        offerService();

这里还是如题说说心跳机制吧,心跳由DataNode发起,
心跳首先是由DataNode的一个线程开启的,一直循环调用  offerService();方法,DataNode在这个方法中发送心跳
  while (shouldRun) {
      try {
        long startTime = now();


        //
        // Every so often, send heartbeat or block-report
        //
        
        if (startTime - lastHeartbeat > heartBeatInterval) {
          //
          // All heartbeat messages include following info:
          // -- Datanode name
          // -- data transfer port
          // -- Total capacity
          // -- Bytes remaining
          //
          lastHeartbeat = startTime;
//定期发送心跳
          DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
                                                       data.getCapacity(),
                                                       data.getDfsUsed(),
                                                       data.getRemaining(),
                                                       xmitsInProgress.get(),
                                                       getXceiverCount());

          myMetrics.heartbeats.inc(now() - startTime);
          //LOG.info("Just sent heartbeat, with name " + localName);
//响应心跳执行返回的命令
          if (!processCommand(cmds))
            continue;
        }

在这个方法中通过调用NameNode的RPC调用namenode.sendHeartbeat()发送心跳,之后就是处理返回的请求
-------------------------------------再来看看NameNode端------------------------------------------------------------------------------
  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
                                       long capacity,
                                       long dfsUsed,
                                       long remaining,
                                       int xmitsInProgress,
                                       int xceiverCount) throws IOException {
    verifyRequest(nodeReg);
    return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
        xceiverCount, xmitsInProgress);
  }


最终是由namesystem来处理的,二namesystem就是FSNamesystem的一个实例(在NameNode.java中public FSNamesystem namesystem; )
FSNamesystem处理完后返回一个an array of datanode commands 交由DataNode处理,


另外:
  /**
   * Periodically calls heartbeatCheck().
   */
  class HeartbeatMonitor implements Runnable {
    /**
     */
    public void run() {
      while (fsRunning) {
        try {
          heartbeatCheck();
        } catch (Exception e) {
          FSNamesystem.LOG.error(StringUtils.stringifyException(e));
        }
        try {
          Thread.sleep(heartbeatRecheckInterval);
        } catch (InterruptedException ie) {
        }
      }
    }
  }

其中的HeartbeatMonitor 处理失效的DataNode,将其移除removeDatanode(nodeInfo);



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-20286-1-1.html 上篇帖子: Hadoop Map/Reduce执行全流程关键代码 下篇帖子: Hadoop源码分析之一(RPC机制之Server)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表