|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.
*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.util.concurrent.SettableFuture;
@SuppressWarnings(
"unchecked")
@Private
@Unstable
public abstract><T extends SchedulerApplicationAttempt, N extends SchedulerNode> extends AbstractService implements ResourceScheduler {
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
// Nodes in the cluster, indexed by NodeId// 在集群中的节点,用NodeId索引 protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
// Whole capacity of the cluster
// 集群全部容量
protected Resource clusterResource = Resource.newInstance(0, 0);
protected Resource minimumAllocation;
private Resource maximumAllocation;
private Resource configuredMaximumAllocation;
private int maxNodeMemory = -1;
private int maxNodeVCores = -1;
private final ReadLock maxAllocReadLock;
private final WriteLock maxAllocWriteLock;
private boolean useConfiguredMaximumAllocationOnly = true;
private long configuredMaximumAllocationWaitTime;
protected RMContext rmContext;
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
*/
// 所有继承自AbstractYarnScheduler的调度器应该使用并行版本的'applications' map
protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
protected int nmExpireInterval;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
/**
* Construct the service.
*
* @param name service name
*/
/*
* 构造服务
* 参数name表示服务名
*/
public AbstractYarnScheduler(String name) {
super(name);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.maxAllocReadLock = lock.readLock();
this.maxAllocWriteLock = lock.writeLock();
}
// 服务所需的所有初始化代码。
@Override
public void serviceInit(Configuration conf) throws Exception {
// getInt()表示获取<code> name </code>属性的值作为<code> int </code>。
// 如果没有这样的属性,返回提供的默认值,或者如果指定的值不是有效的<code> int </ code>,那么会抛出一个错误。
// 第一个参数是String name,第二个参数int defaultValue
// DEFAULT_RM_NM_EXPIRY_INTERVAL_MS指节点管理器被认为死所要的等待的时间,默认为600000ms。
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
// 获取<code> name </code>属性的值作为<code> long </code>。
// 如果没有这样的属性,返回所提供的默认值,或者如果指定的值不是有效的<code> long </ code>,则会抛出错误。
// 第一个参数是String name,第二个参数long defaultValue
// DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS指,默认为10000ms。
configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
//
createReleaseCache();
super.serviceInit(conf);
}
public List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
// 从<code>ApplicationAttempId</code>中获取<code>ApplicationId</code>
ApplicationId appId = currentAttempt.getApplicationId();
// 调用的get()函数是Map类,返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
SchedulerApplication<T> app = applications.get(appId);
// 构造一个初始容量为十的空列表。它只接收Container类型
List<Container> containerList = new ArrayList<Container>();
// rmContext是接口RMContext的对象,而该接口只有一个实现类RMContextImpl,
// rmContext.getRMApps()返回ConcurrentMap<ApplicationId, RMApp>
// rmContext.getRMApps().get(appId)调用的是Map类的get()函数。
RMApp appImpl = this.rmContext.getRMApps().get(appId);
// appImpl是接口RMApp对象,
// appImpl.getApplicationSubmissionContext()此{@link RMApp}的应用程序提交上下文,返回ApplicationSubmissionContext
// getUnmanagedAM()获取是否RM应该管理AM的执行。
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
return containerList;
}
if (app == null) {
return containerList;
}
// getLiveContainers()获取应用程序的活动容器并返回。
Collection<RMContainer> liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
// getMasterContainer()是供Application Master运行的容器,
// Container类的getId()获取容器的全局唯一标识符。
// 最后获取的是Application Master的容器Id
ContainerId amContainerId =
rmContext.getRMApps().get(appId).getCurrentAppAttempt()
.getMasterContainer().getId();
for (RMContainer rmContainer : liveContainers) {
// 判断当前的Id是否是Application Master的容器Id
if (!rmContainer.getContainerId().equals(amContainerId)) {
// 不相等,则往容器列表中添加容器
containerList.add(rmContainer.getContainer());
}
}
return containerList;
}
public Map<ApplicationId, SchedulerApplication<T>>
getSchedulerApplications() {
return applications;
}
// 获取集群的整个资源容量。
@Override
public Resource getClusterResource() {
return clusterResource;
}
// 获取最小可分配{@link Resource}。
@Override
public Resource getMinimumResourceCapability() {
return minimumAllocation;
}
// 在集群级别获取最大可分配{@link Resource}。
@Override
public Resource getMaximumResourceCapability() {
Resource maxResource;
maxAllocReadLock.lock();
try {
// 类最开始定义useConfiguredMaximumAllocationOnly为true
if (useConfiguredMaximumAllocationOnly) {
// System.currentTimeMillis()产生一个当前的毫秒,这个毫秒其实就是自1970年1月1日0时起的毫秒数
// ResourceManager.getClusterTimeStamp()调用的也是System.currentTimeMillis(),
// configuredMaximumAllocationWaitTime默认值为10000ms
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
> configuredMaximumAllocationWaitTime) {
useConfiguredMaximumAllocationOnly = false; //设为false
}
// 克隆一份资源
maxResource = Resources.clone(configuredMaximumAllocation);
} else {
maxResource = Resources.clone(maximumAllocation);
}
} finally {
maxAllocReadLock.unlock();
}
return maxResource;
}
//
@Override
public Resource getMaximumResourceCapability(String queueName) {
return getMaximumResourceCapability();
}
// 初始化最大资源容量
protected void initMaximumResourceCapability(Resource maximumAllocation) {
maxAllocWriteLock.lock();
try {
if (this.configuredMaximumAllocation == null) {
// 克隆资源
this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
this.maximumAllocation = Resources.clone(maximumAllocation);
}
} finally {
maxAllocWriteLock.unlock();
}
}
//
protected synchronized void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
// Get the application for the finished container
// 获取完成了的容器的应用程序
SchedulerApplicationAttempt application = getCurrentAttemptForContainer
(containerId);
if (application == null) {
// getApplicationAttemptId()获取分配了<code> Container </code>的应用程序的<code> ApplicationAttemptId </code>。
// getApplicationId() 获取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
// rmContext是接口RMContext的对象, rmContext.getDispatcher()返回接口Dispatcher的对象,
// rmContext.getDispatcher().getEventHandler()返回接口EventHandler对象, 最后调用EventHandler的handle()方法
// RMNodeCleanContainerEvent表示资源管理器节点清除容器事件,构造函数内部有RMNodeEventType.CLEANUP_CONTAINER
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
//
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
// getApplicationId() 获取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
// 调用的get()函数是Map类,返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
SchedulerApplication<T> app =
applications.get(applicationAttemptId.getApplicationId());
// getCurrentAppAttempt()返回的是SchedulerApplicationAttempt类对象
return app == null ? null : app.getCurrentAppAttempt();
}
// 从给定应用程序尝试Id中获取调度器应用程序
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
// SchedulerAppReport类 表示应用程序尝试,以及尝试使用的资源。
return new SchedulerAppReport(attempt);
}
// 从给定的应用程序尝试ID获取资源使用情况报告。
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
//
return attempt.getResourceUsageReport();
}
// 根据容器Id获取当前应用程序的尝试
public T getCurrentAttemptForContainer(ContainerId containerId) {
return getApplicationAttempt(containerId.getApplicationAttemptId());
}
// 获取给定containerId的容器。
@Override
public RMContainer getRMContainer(ContainerId containerId) {
SchedulerApplicationAttempt attempt =
getCurrentAttemptForContainer(containerId);
// getRMContainer()方法表示获取资源管理器容器
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
// 获取节点资源使用情况报告。
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
// Map类方法get()
N node = nodes.get(nodeId);
// SchedulerNodeReport类表示节点使用报告
return node == null ? null : new SchedulerNodeReport(node);
}
// 将给定的应用程序移动到给定的队列
@Override
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support moving apps between queues");
}
// 移除一个已有的队列
public void removeQueue(String queueName) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support removing queues");
}
// 把一个新队列添加到调度器。
@Override
public void addQueue(Queue newQueue) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support this operation");
}
// 此方法增加了当前队列的权限
@Override
public void setEntitlement(String queue, QueueEntitlement entitlement)
throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support this operation");
}
//在节点上杀死孤立容器
private void killOrphanContainerOnNode(RMNode node,
NMContainerStatus container) {
// getContainerState()获取容器的状态
// Enum类的equals()函数表示 如果指定的对象等于此枚举常量,则返回true。否则false。
// ContainerState类表示容器的状态,有三种NEW, RUNNING, COMPLETE。COMPLETE表示完成的容器。
if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
// 在本类的containerLaunchedOnNode()函数中有一样的,略
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(),
container.getContainerId()));
}
}
// 在节点上恢复容器
public synchronized void recoverContainersOnNode(
List<NMContainerStatus> containerReports, RMNode nm) {
if (!rmContext.isWorkPreservingRecoveryEnabled()
|| containerReports == null
|| (containerReports != null && containerReports.isEmpty())) {
return;
}
for (NMContainerStatus container : containerReports) {
/*
* container.getContainerId()获取容器的<code> ContainerId </code>。
* getApplicationAttemptId() 获取分配了<code> Container </code>的应用程序的<code> ApplicationAttemptId </code>。
* getApplicationId() 获取<code> ApplicationAttempId </ code>的<code> ApplicationId </code>。
*/
ApplicationId appId =
container.getContainerId().getApplicationAttemptId().getApplicationId();
//
RMApp rmApp = rmContext.getRMApps().get(appId);
if (rmApp == null) {
LOG.error("Skip recovering container " + container
+ " for unknown application.");
killOrphanContainerOnNode(nm, container);
continue;
}
// Unmanaged AM recovery is addressed in YARN-1815
// 未经管理的AM恢复在YARN-1815中得到解决
// rmApp.getApplicationSubmissionContext()函数表示{@link RMApp}的应用程序提交上下文
// getUnmanagedAM()获取是否RM应该管理AM的执行。如果为真,则RM不会为AM分配容器并启动它。
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
LOG.info("Skip recovering container " + container + " for unmanaged AM."
+ rmApp.getApplicationId());
killOrphanContainerOnNode(nm, container);
continue;
}
//Map类的get()函数
SchedulerApplication<T> schedulerApp = applications.get(appId);
if (schedulerApp == null) {
//rmApp.getState()表示{@link RMApp}的当前状态。
LOG.info("Skip recovering container " + container
+ " for unknown SchedulerApplication. Application current state is "
+ rmApp.getState());
killOrphanContainerOnNode(nm, container);
continue;
}
LOG.info("Recovering container " + container);
SchedulerApplicationAttempt schedulerAttempt =
schedulerApp.getCurrentAppAttempt();
// getKeepContainersAcrossApplicationAttempts()函数 获取指示是否在应用程序尝试中保留容器的标志
if (!rmApp.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
// Do not recover containers for stopped attempt or previous attempt.
// 不要因为停止了的尝试或以前的尝试恢复容器。
if (schedulerAttempt.isStopped()
|| !schedulerAttempt.getApplicationAttemptId().equals(
container.getContainerId().getApplicationAttemptId())) {
LOG.info("Skip recovering container " + container
+ " for already stopped attempt.");
killOrphanContainerOnNode(nm, container);
continue;
}
}
// create container
// 创建容器
RMContainer rmContainer = recoverAndCreateContainer(container, nm);
// recover RMContainer
// 恢复 RMContainer
rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
container));
// recover scheduler node
// 恢复调度器节点
nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
// recover queue: update headroom etc.
// 恢复队列:更新净空等等
Queue queue = schedulerAttempt.getQueue();
queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
// recover scheduler attempt
// 恢复调度器尝试
schedulerAttempt.recoverContainer(rmContainer);
// set master container for the current running AMContainer for this
// attempt.
// 为这个尝试 为当前运行的AMContainer设置主容器
RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
if (appAttempt != null) {
// getMasterContainer()函数表示 ApplicationMaster运行在其上的容器
Container masterContainer = appAttempt.getMasterContainer();
// Mark current running AMContainer's RMContainer based on the master
// container> // 根据存储在AppAttempt中的主容器ID,标记当前正在运行的AMContainer的RMContainer。
if (masterContainer != null
&& masterContainer.getId().equals(rmContainer.getContainerId())) {
// 设置ApplicationMaster容器
((RMContainerImpl)rmContainer).setAMContainer(true);
}
}
synchronized (schedulerAttempt) {
// 这个pendingRelease用于工作维护恢复方案,以跟踪AM的未完成发布请求。
// RM恢复可以收到AM的发布请求表,在此之前从NM收到容器状态以进行恢复。
// 在这种情况下,由NM报告的待回收容器不应该被收回。
Set<ContainerId>> // Set类中的contains()函数,
//如果此集合包含指定的元素,则返回<tt> true </ tt>。 更正式地,当且仅当该集合包含元素<tt> e </ tt>时,返回<tt> true </ tt>,
//这样<tt>(o==null ? e==null : o.equals(e))</tt>.
if (releases.contains(container.getContainerId())) {
//> //释放容器
rmContainer.handle(new RMContainerFinishedEvent(container
.getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED));
releases.remove(container.getContainerId());
LOG.info(container.getContainerId() + " is> }
}
}
}
// 恢复并创建容器
// NMContainerStatus包括容器的当前信息。
// RMNode类表示节点管理器有关可用资源和其他静态信息的信息。
private RMContainer recoverAndCreateContainer(NMContainerStatus status,
RMNode node) {
// 创建Container实例
Container container =
Container.newInstance(status.getContainerId(), node.getNodeID(),
node.getHttpAddress(), status.getAllocatedResource(),
status.getPriority(), null);
// 获取应用程序的尝试Id
ApplicationAttemptId attemptId =
container.getId().getApplicationAttemptId();
// 创建一个RMContainerImpl对象
RMContainer rmContainer =
new RMContainerImpl(container, attemptId, node.getNodeID(),
applications.get(attemptId.getApplicationId()).getUser(), rmContext,
status.getCreationTime());
return rmContainer;
}
/**
* Recover resource request back from RMContainer when a container is
* preempted before AM pulled the same. If container is pulled by
* AM, then RMContainer will not have resource request to recover.
* @param rmContainer
*/
/*
* 在AM拉出相同之前当容器被抢占时,从RMContainer恢复资源请求。如果容器被AM拉过来,则RMContainer将不会有资源请求恢复。
*/
protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
// getResourceRequests()函数获取资源请求
List<ResourceRequest> requests = rmContainer.getResourceRequests();
// If container state is moved to ACQUIRED, request will be empty.
// 如果容器状态被移动到 ACQUIRED,请求将为空。
if (requests == null) {
return;
}
// Add resource request back to Scheduler.
// 将资源请求添加回调度器。
SchedulerApplicationAttempt schedulerAttempt
= getCurrentAttemptForContainer(rmContainer.getContainerId());
if (schedulerAttempt != null) {
// 恢复资源请求
schedulerAttempt.recoverResourceRequests(requests);
}
}
protected void createReleaseCache() {
// Cleanup the cache after nm expire interval.
// 在nm到期之际后清除缓存。
// Timer类创建一个新的计时器。schedule()函数表示在指定的延迟之后安排指定的任务执行。
new Timer().schedule(new TimerTask() {
@Override
public void run() {
// Map类的values()函数表示 返回此map中包含的值的{@link Collection}视图。
for (SchedulerApplication<T> app : applications.values()) {
// 获取当前应用程序的尝试
T attempt = app.getCurrentAppAttempt();
synchronized (attempt) {
//
for (ContainerId containerId : attempt.getPendingRelease()) {
// logFailure()函数表示 为失败的事件创建可读和可分析的审核日志字符串。
RMAuditLogger.logFailure(
app.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container",
"Scheduler",
"Trying to> attempt.getApplicationId(), containerId);
}
// Set类的clear()函数表示 从此set中删除所有元素(可选操作)。 此调用返回后,该组将为空。
attempt.getPendingRelease().clear();
}
}
LOG.info("Release request cache is cleaned up");
}
}, nmExpireInterval);
}
// clean up a completed container
// 清理完成的容器
protected abstract void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event);
// 清除容器
protected void> SchedulerApplicationAttempt attempt) {
for (ContainerId containerId : containers) {
// 获取给定containerId的容器。
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
//
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
< nmExpireInterval) {
LOG.info(containerId + " doesn't exist. Add the container"
+ " to the> synchronized (attempt) {
// Set类的add()函数表示 如果指定的元素不存在,则将其指定的元素添加到这个set(可选操作)。
// 更正式地,如果set不包含元素<tt> e2 </tt>,则将指定的元素<tt> e </tt>添加到此set,以便
//<tt>(e==null ? e2==null : e.equals(e2))</tt>.
attempt.getPendingRelease().add(containerId);
}
} else {
// logFailure()函数表示 为失败的事件创建可读和可分析的审核日志字符串
RMAuditLogger.logFailure(attempt.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "Scheduler",
"Trying to> attempt.getApplicationId(), containerId);
}
}
// 清理完成的容器
// createAbnormalContainerStatus()函数表示在特殊情况下创建{@link ContainerStatus}的实用程序。
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(containerId,
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
}
}
// 获取
// SchedulerNode类表示 从调度器的角度表示YARN集群节点。
public SchedulerNode getSchedulerNode(NodeId nodeId) {
// Map类的get()函数表示 返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
return nodes.get(nodeId);
}
// 完全排除应用程序的sourceQueue,将其全部移动到destQueue。
@Override
public synchronized void moveAllApps(String sourceQueue, String destQueue)
throws YarnException {
// check if destination queue is a valid leaf queue
// 检查目标队列是否是有效的叶队列
try {
getQueueInfo(destQueue, false, false);
} catch (IOException e) {
LOG.warn(e);
throw new YarnException(e);
}
// check if source queue is a valid
// 检查源队列是否有效
// getAppsInQueue()函数表示 获取给定队列下的应用程序
List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
if (apps == null) {
String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
LOG.warn(errMsg);
throw new YarnException(errMsg);
}
// generate move events for each pending/running app
// 为每个待处理/正在运行的应用生成移动事件
for (ApplicationAttemptId app : apps) {
//
SettableFuture<Object> future = SettableFuture.create();
// RMAppMoveEvent类构造函数内部有 RMAppEventType.MOVE事件。
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
}
}
// 终止指定队列中的所有应用程序。
@Override
public synchronized void killAllAppsInQueue(String queueName)
throws YarnException {
// check if queue is a valid
// 检查队列是否有效
// getAppsInQueue()函数表示 获取给定队列下的应用程序
List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
if (apps == null) {
String errMsg = "The specified Queue: " + queueName + " doesn't exist";
LOG.warn(errMsg);
throw new YarnException(errMsg);
}
// generate kill events for each pending/running app
// 为每个待处理/正在运行的应用生成kill事件
for (ApplicationAttemptId app : apps) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
"Application killed due to expiry of reservation queue " +
queueName + "."));
}
}
/**
* Process resource update on a node.
*/
// 在节点上处理资源更新。
public synchronized void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
// 获取节点上的总资源。
Resource oldResource = node.getTotalResource();
if(!oldResource.equals(newResource)) {
// Log resource change
// 日志记录资源更改
LOG.info("Update resource on node: " + node.getNodeName()
+ " from: " + oldResource + ", to: "
+ newResource);
// Map类的remove()函数表示 从该map中删除一个键的映射,如果存在(可选的操作)。
// 更正式地,如果该map包含从<tt> k </tt>到值<tt> v </tt>的映射,使得<code>(key==null ? k==null : key.equals(k))</code>,
// 该映射被删除。(map最多可以包含一个这样的映射。)
nodes.remove(nm.getNodeID());
//
updateMaximumAllocation(node, false);
// update resource to node
// 将资源更新到节点
// 在节点上设置总资源。
node.setTotalResource(newResource);
// Map类的put()函数表示 将指定的值与该映射中的指定键相关联(可选操作)。如果map先前包含了键的映射,则旧值将被指定的值替换。
nodes.put(nm.getNodeID(), (N)node);
//
updateMaximumAllocation(node, true);
// update resource to clusterResource
// 将资源更新到clusterResource
// subtractFrom(clusterResource, oldResource)表示从clusterResource减去oldResource,资源包括内存和虚拟内核
Resources.subtractFrom(clusterResource, oldResource);
// addTo(clusterResource, newResource)表示在clusterResource添加newResource,资源包括内存和虚拟内核
Resources.addTo(clusterResource, newResource);
} else {
// Log resource change
// 日志记录资源改变
LOG.warn("Update resource on node: " + node.getNodeName()
+ " with the same resource: " + newResource);
}
}
/** {@inheritDoc} */
// 返回调度时考虑的资源类型的集合
@Override
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
// EnumSet类的of()函数 创建一个最初包含指定元素的枚举集。
return EnumSet.of(SchedulerResourceTypes.MEMORY);
}
// 获取由预留系统管理的队列的名称列表
@Override
public Set<String> getPlanQueues() throws YarnException {
// Object类的getClass()函数 返回此{@code Object}的运行时类。
// 返回的{@code>
//> throw new YarnException(getClass().getSimpleName()
+ " does not support reservations");
}
// 更新最大可分配
protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
// 获取节点上的总资源
Resource totalResource = node.getTotalResource();
maxAllocWriteLock.lock();
try {
if (add) { // added node //添加节点
// 获取资源的<em>memory</em>
int nodeMemory = totalResource.getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
// 设置资源的<em>memory</em>
// Math.min()返回两个数的最小值
maximumAllocation.setMemory(Math.min(
configuredMaximumAllocation.getMemory(), maxNodeMemory));
}
// 获取资源的<em>number of virtual cpu cores</em>
int nodeVCores = totalResource.getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
maximumAllocation.setVirtualCores(Math.min(
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
}
} else { // removed node //删除节点
if (maxNodeMemory == totalResource.getMemory()) {
maxNodeMemory = -1;
}
if (maxNodeVCores == totalResource.getVirtualCores()) {
maxNodeVCores = -1;
}
// We only have to iterate through the nodes if the current max memory
// or vcores was equal to the removed node's
// 如果当前的最大内存或虚拟内核等于被删除的节点的,我们只需遍历节点
if (maxNodeMemory == -1 || maxNodeVCores == -1) {
// A map entry (key-value pair). entrySet()返回此map中包含的映射的{@link Set}视图。
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
int nodeMemory =
nodeEntry.getValue().getTotalResource().getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
}
int nodeVCores =
nodeEntry.getValue().getTotalResource().getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
}
}
if (maxNodeMemory == -1) { // no nodes //无节点
maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
} else {
maximumAllocation.setMemory(
Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
}
if (maxNodeVCores == -1) { // no nodes //无节点
maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
} else {
maximumAllocation.setVirtualCores(
Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
}
}
}
} finally {
maxAllocWriteLock.unlock();
}
}
// 刷新最大可分配
protected void refreshMaximumAllocation(Resource newMaxAlloc) {
maxAllocWriteLock.lock();
try {
configuredMaximumAllocation = Resources.clone(newMaxAlloc);
int maxMemory = newMaxAlloc.getMemory();
if (maxNodeMemory != -1) {
maxMemory = Math.min(maxMemory, maxNodeMemory);
}
int maxVcores = newMaxAlloc.getVirtualCores();
if (maxNodeVCores != -1) {
maxVcores = Math.min(maxVcores, maxNodeVCores);
}
//
maximumAllocation = Resources.createResource(maxMemory, maxVcores);
} finally {
maxAllocWriteLock.unlock();
}
}
// 为应用尝试获取待处理的资源请求
public List<ResourceRequest> getPendingResourceRequestsForAttempt(
ApplicationAttemptId attemptId) {
// 获取应用程序尝试
SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId);
if (attempt != null) {
// getAppSchedulingInfo()获取应用程序调度信息。 getAllResourceRequests()获取所有的资源请求。
return attempt.getAppSchedulingInfo().getAllResourceRequests();
}
return null;
}
} |
|