// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
}
this.server.start();
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
服务于管理员的RPC server:AdminService ,handler数量由yarn.resourcemanager.admin.client.thread-count控制,该服务实现ResourceManagerAdministrationProtocol接口
protected void startServer() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server = (Server) rpc.getServer(
ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
.......
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
server.getListenerAddress());
}
AsyncDispatcher event handler服务的启动:
调用层次比较深,只关注关键部分,调用栈的顶层:
AsyncDispatcher类直接继承自AbstractService,服务启动时会先调用父类的同名函数
@Override
protected void serviceStart() throws Exception {
//调用父类同名函数,实际啥都木有做,以后全局初始化之类的操作可能会放进去
super.serviceStart();
//创建一个新的线程,并启动,主要的业务关系包含在createThread函数中
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName("AsyncDispatcher event handler");
eventHandlingThread.start();
}
下面看AsyncDispatcher的线程执行体,由上面的createThread创建,该线程会进入主循环,并一直等待事件队列,一旦有新的事件到达,便执行dispatch(event),将事件分发出去
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
//查看服务标识和线程状态
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
//加入该检测是防止事件过多导致该线程压力过大
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
//在队列中取出事件
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
//分发事件
dispatch(event);
}
}
}
};
}
RM的服务类型还是比较多的,而且好多服务都是多线程的,比如RPCserver,默认的handler就有50个,而且有多个RPC server,RM中整体的服务列表服下:
Service org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService
Service org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer
Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED
Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED
Service org.apache.hadoop.yarn.server.resourcemanager.NodesListManager
Service org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher
Service NMLivelinessMonitor in state NMLivelinessMonitor: INITED
Service org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService
Service org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
Service org.apache.hadoop.yarn.server.resourcemanager.ClientRMService
Service org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher
Service Dispatcher in state Dispatcher: INITED
Service org.apache.hadoop.yarn.server.resourcemanager.AdminService