设为首页 收藏本站
查看: 1083|回复: 0

[经验分享] Hadoop2.x ResourceManager启动之服务启动

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-3-21 15:14:03 | 显示全部楼层 |阅读模式
YARN ResourceManager启动之服务启动
RM是个综合服务类,内部包含了多个服务,所有的服务被放在列表中,通过循环逐个启动,其他服务的列表如下:

每个服务的启动都遵循一定的流程,服务的启动流程如下:
1、ResourceManager.java中的serviceStart调用父类的serviceStart
//ResourceManager.java  
protected void serviceStart() throws Exception {  
......  
   super.serviceStart();  
}  
2、父类CompositeService.serviceStart
protected void serviceStart() throws Exception {  
    //获得服务列表  
  List<Service> services = getServices();  
  if (LOG.isDebugEnabled()) {  
    LOG.debug(getName() + ": starting services, size=" + services.size());  
  }  
  //循环启动服务,每一次start调用最终都会进入服务本身的serviceStart函数  
  for (Service service : services) {  
    // start the service. If this fails that service  
    // will be stopped and an exception raised  
    service.start();  
  }  
  super.serviceStart();  
}  
3、进入父类的父类AbstractService
public void start() {  
        //服务是否已经启动?  
    if (isInState(STATE.STARTED)) {  
      return;  
    }  
    //enter the started state  
    synchronized (stateChangeLock) {  
      if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {  
        try {  
            //记录服务启动时间  
          startTime = System.currentTimeMillis();  
          //开始启动服务,此处会进入子类的函数+++  
          serviceStart();  
          //检测服务是否启动成功  
          if (isInState(STATE.STARTED)) {  
            //if the service started (and isn't now in a later state), notify  
            if (LOG.isDebugEnabled()) {  
              LOG.debug("Service " + getName() + " is started");  
            }  
            notifyListeners();  
          }  
        } catch (Exception e) {  
          noteFailure(e);  
          ServiceOperations.stopQuietly(LOG, this);  
          throw ServiceStateException.convert(e);  
        }  
      }  
    }  
  }  
4、最终进入子类serviceStart函数中启动服务
由此可以看出的由于服务的抽象对服务的统一管理带来了便利,如果后续再增加服务,只要按这个继承关系就可以将服务纳入统一管理了。
Token管理器服务线程启动
@Override  
public void serviceStart() throws Exception {  
    //Token管理器启动,具体作用以后分析,每个管理器由Timer驱动  
  amRmTokenSecretManager.start();  
  containerTokenSecretManager.start();  
  nmTokenSecretManager.start();  


  try {  
    //过期Token移除线程  
    rmDTSecretManager.startThreads();  
  } catch(IOException ie) {  
    throw new YarnRuntimeException("Failed to start secret manager threads", ie);  
  }  
  super.serviceStart();  
}  
Ping Checker服务:AbstractLivelinessMonitor的内部类,循环遍历已记录的NodeManager列表,当发现某个节点超过一段时间未汇报,则认为他已经挂掉,在列表中删除。
private class PingChecker implements Runnable {  
  @Override  
  public void run() {  
    while (!stopped && !Thread.currentThread().isInterrupted()) {  
      synchronized (AbstractLivelinessMonitor.this) {  
        //获得活动NM列表的迭代器  
        Iterator<Map.Entry<O, Long>> iterator =   
          running.entrySet().iterator();  


        //avoid calculating current time everytime in loop  
        long currentTime = clock.getTime();  
            //迭代每个节点,若发现节点超过expireInterval(yarn.nm.liveness-monitor.expiry-interval-ms控制,默认10分钟)  
            //则认为他已经挂掉,删除该节点  
        while (iterator.hasNext()) {  
          Map.Entry<O, Long> entry = iterator.next();  
          if (currentTime > entry.getValue() + expireInterval) {  
            iterator.remove();  
            expire(entry.getKey());  
            LOG.info("Expired:" + entry.getKey().toString() +   
                    " Timed out after " + expireInterval/1000 + " secs");  
          }  
        }  
      }  
      try {  
        //线程暂停monitorInterval( expireInterval/3)  
        Thread.sleep(monitorInterval);  
      } catch (InterruptedException e) {  
        LOG.info(getName() + " thread interrupted");  
        break;  
      }  
    }  
  }  
}  
ResourceManager Event Processor服务:
private final class EventProcessor implements Runnable {  
  @Override  
  public void run() {  


    SchedulerEvent event;  


    while (!stopped && !Thread.currentThread().isInterrupted()) {  
      try {  
        //取出事件  
        event = eventQueue.take();  
      } catch (InterruptedException e) {  
        LOG.error("Returning, interrupted : " + e);  
        return; // TODO: Kill RM.  
      }  


      try {  
        //处理事件  
        scheduler.handle(event);  
      } catch (Throwable t) {  
                .....  
      }  
    }  
  }  
}  
ResourceTrackerService服务:RPC服务器,实现了ResourceTracker接口,提供NM的注册和心跳服务
//ResourceTrackerService.java  
@Override  
protected void serviceStart() throws Exception {  
  super.serviceStart();  
  // ResourceTrackerServer authenticates NodeManager via Kerberos if  
  // security is enabled, so no secretManager.  
  //创建RPC服务器,该服务器实现ResourceTracker接口,handler数量由yarn.resourcemanager.  
resource-tracker.client.thread-count控制  
  Configuration conf = getConfig();  
  YarnRPC rpc = YarnRPC.create(conf);  
  this.server =  
    rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,  
        conf, null,  
        conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,   
            YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));  

  // Enable service authorization?  
  if (conf.getBoolean(  
      CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,   
      false)) {  
    refreshServiceAcls(conf, new RMPolicyProvider());  
  }  
    //服务启动  
  this.server.start();  
  conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,  
                         server.getListenerAddress());  
}  
RPC服务器组件启动,主要包括responder、listener、handler
public synchronized void start() {  
  responder.start();  
  listener.start();  
  handlers = new Handler[handlerCount];  

  for (int i = 0; i < handlerCount; i++) {  
    handlers = new Handler(i);  
    handlers.start();  
  }  
}  
服务于客户端的RPC server:ClientRMService,类似ResourceTrackerService,该服务器实现了ApplicationClientProtocol接口,RPC server的启动都一样,只是实现的协议不同
@Override  
protected void serviceStart() throws Exception {  
  Configuration conf = getConfig();  
  YarnRPC rpc = YarnRPC.create(conf);  
  //handler数量由yarn.resourcemanager.client.thread-count控制  
  this.server =     
    rpc.getServer(ApplicationClientProtocol.class, this,  
          clientBindAddress,  
          conf, this.rmDTSecretManager,  
          conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,   
              YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));  

  // Enable service authorization?  
  if (conf.getBoolean(  
      CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,   
      false)) {  
    refreshServiceAcls(conf, new RMPolicyProvider());  
  }  

  this.server.start();  
  clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,  
                                             server.getListenerAddress());  
  super.serviceStart();  
}  
服务于管理员的RPC server:AdminService ,handler数量由yarn.resourcemanager.admin.client.thread-count控制,该服务实现ResourceManagerAdministrationProtocol接口
protected void startServer() throws Exception {  
  Configuration conf = getConfig();  
  YarnRPC rpc = YarnRPC.create(conf);  
  this.server = (Server) rpc.getServer(  
      ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,  
      conf, null,  
      conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,  
          YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));  
.......  
  this.server.start();  
  conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,  
      server.getListenerAddress());  
}  
AsyncDispatcher event handler服务的启动:
调用层次比较深,只关注关键部分,调用栈的顶层:
AsyncDispatcher类直接继承自AbstractService,服务启动时会先调用父类的同名函数
@Override  
protected void serviceStart() throws Exception {  
  //调用父类同名函数,实际啥都木有做,以后全局初始化之类的操作可能会放进去  
  super.serviceStart();  
  //创建一个新的线程,并启动,主要的业务关系包含在createThread函数中  
  eventHandlingThread = new Thread(createThread());  
  eventHandlingThread.setName("AsyncDispatcher event handler");  
  eventHandlingThread.start();  
}  
下面看AsyncDispatcher的线程执行体,由上面的createThread创建,该线程会进入主循环,并一直等待事件队列,一旦有新的事件到达,便执行dispatch(event),将事件分发出去
Runnable createThread() {  
  return new Runnable() {  
    @Override  
    public void run() {  
        //查看服务标识和线程状态  
      while (!stopped && !Thread.currentThread().isInterrupted()) {  
        drained = eventQueue.isEmpty();  
        // blockNewEvents is only set when dispatcher is draining to stop,  
        // adding this check is to avoid the overhead of acquiring the lock  
        // and calling notify every time in the normal run of the loop.  
        //加入该检测是防止事件过多导致该线程压力过大  
        if (blockNewEvents) {  
          synchronized (waitForDrained) {  
            if (drained) {  
              waitForDrained.notify();  
            }  
          }  
        }  
        Event event;  
        try {  
            //在队列中取出事件  
          event = eventQueue.take();  
        } catch(InterruptedException ie) {  
          if (!stopped) {  
            LOG.warn("AsyncDispatcher thread interrupted", ie);  
          }  
          return;  
        }  
        if (event != null) {  
            //分发事件  
          dispatch(event);  
        }  
      }  
    }  
  };  
}  
RM的服务类型还是比较多的,而且好多服务都是多线程的,比如RPCserver,默认的handler就有50个,而且有多个RPC server,RM中整体的服务列表服下:
Service org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService   
Service org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer   
Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED  
Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED  
Service org.apache.hadoop.yarn.server.resourcemanager.NodesListManager   
Service org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher   
Service NMLivelinessMonitor in state NMLivelinessMonitor: INITED  
Service org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService   
Service org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService   
Service org.apache.hadoop.yarn.server.resourcemanager.ClientRMService   
Service org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher   
Service Dispatcher in state Dispatcher: INITED  
Service org.apache.hadoop.yarn.server.resourcemanager.AdminService   



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-16047-1-1.html 上篇帖子: 一台机器安装多套hadoop环境 端口冲突的解决 下篇帖子: hadoop 常用配置项
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表