设为首页 收藏本站
查看: 1252|回复: 0

[经验分享] Hadoop源代码分析之DatanodeProtocol(sendHeartbeat方法的调用)

[复制链接]

尚未签到

发表于 2018-11-1 09:07:17 | 显示全部楼层 |阅读模式
  1、心跳机制
  

  
心跳的机制大概是这样的:
  
1) master启动的时候,会开一个ipc server在那里。
  
2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。
  

  2、找到心跳的代码
  

  
拿namenode和datanode来说,在datanode的offerService方法中,每隔3秒向namenode发送心跳
    3、心跳的底层细节一:datanode怎么获得namenode对象的?
  

  
首先,DataNode类中,有一个namenode的成员变量:
  Java代码  


  • public class DataNode extends Configured
  •     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
  •   ...
  •   public DatanodeProtocol namenode = null;
  •   ...
  • }

  


  
public>  

  
  ...
  

  
  public DatanodeProtocol namenode = null;
  

    4、心跳的底层细节二:看看Invoker
  

  
Invoker类是org.apache.hadoop.ipc.RPC类的一个静态内部类:
  Java代码  


  • private static class Invoker implements InvocationHandler {

  


  
   private static>  

  在这个类中,看invoke方法
  5、心跳的底层细节三:Invocation
  

  
Invocation类是org.apache.hadoop.ipc.RPC类的一个静态内部类
  

  
没有什么业务逻辑方法,主要作用就是一个VO
  6、心跳的底层细节四:client类的call方法
  

  
接下来重点看client类的call方法:
  Java代码  


  • public Writable call(Writable param, InetSocketAddress addr,
  •                      Class protocol, UserGroupInformation ticket)
  •                      throws InterruptedException, IOException {

  •   Call call = new Call(param);
  • // 将Invocation转化为Call
  •   Connection connection = getConnection(addr, protocol, ticket, call);
  • // 连接远程服务器
  •   connection.sendParam(call);                 // send the parameter
  • // 将“序列化”后的call发给过去
  •   boolean interrupted = false;
  •   synchronized (call) {
  •     while (!call.done) {
  •       try {
  •         call.wait();                           // wait for the result
  • // 等待调用结果
  •       } catch (InterruptedException ie) {
  •         // save the fact that we were interrupted
  •         interrupted = true;
  •       }
  •     }

  •     if (interrupted) {
  •       // set the interrupt flag now that we are done waiting
  •       Thread.currentThread().interrupt();
  •     }

  •     if (call.error != null) {
  •       if (call.error instanceof RemoteException) {
  •         call.error.fillInStackTrace();
  •         throw call.error;
  •       } else { // local exception
  •         throw wrapException(addr, call.error);
  •       }
  •     } else {
  •       return call.value;
  • // 返回
  •     }
  •   }
  • }
  

  
  public Writable call(Writable param, InetSocketAddress addr,
  

  
                       Class protocol, UserGroupInformation ticket)
  

  
                       throws InterruptedException, IOException {
  

  

  

  
    Call call = new Call(param);  
  

  
   // 将Invocation转化为Call
  

  
    Connection connection = getConnection(addr, protocol, ticket, call);
  

  
   // 连接远程服务器
  

  
    connection.sendParam(call);                 // send the parameter
  

  
   // 将“序列化”后的call发给过去
  

  
    boolean interrupted = false;
  

  
    synchronized (call) {
  

  
      while (!call.done) {
  

  
        try {
  

  
          call.wait();                           // wait for the result
  

  
   // 等待调用结果
  

  
        } catch (InterruptedException ie) {
  

  
          // save the fact that we were interrupted
  

  
          interrupted = true;
  

  
        }
  

  
      }
  

  

  

  
      if (interrupted) {
  

  
        // set the interrupt flag now that we are done waiting
  

  
        Thread.currentThread().interrupt();
  

  
      }
  

  

  

  
      if (call.error != null) {
  

  
        if (call.error instanceof RemoteException) {
  

  
          call.error.fillInStackTrace();
  

  
          throw call.error;
  

  
        } else { // local exception
  

  
          throw wrapException(addr, call.error);
  

  
        }
  

  
      } else {
  

  
        return call.value;
  

  
   // 返回
  

  
      }
  

  
    }
  

  
  }
  

  7、现在,一目了然了
  Java代码  


  • datanode向namenode发送heartbeat过程是这样的:

  •     a) 在datanode初始化获得namenode的proxy
  •     b) 在datanode上,调用namenode proxy的heartbeat方法:
  •         namenode.sendHeartbeat(dnRegistration,
  •                                                        data.getCapacity(),
  •                                                        data.getDfsUsed(),
  •                                                        data.getRemaining(),
  •                                                        xmitsInProgress.get(),
  •                                                        getXceiverCount());
  •     c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法
  •     d) client call方法将Invocation转化为Call对象
  •     e) client 将call发送到真正的namenode服务器
  •     f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来!
  •     g) datanode接收结果,并将结果转化为DatanodeCommand[]




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-629192-1-1.html 上篇帖子: Hadoop分布式文件系统:结构与设计 下篇帖子: hadoop安装配置:使用cloudrea-linuxjcq
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表