设为首页 收藏本站
查看: 1200|回复: 0

[经验分享] Hadoop 三大调度器源码分析及编写自己的调度器

[复制链接]

尚未签到

发表于 2017-12-18 11:44:06 | 显示全部楼层 |阅读模式
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;  

  
import java.io.IOException;
  
import java.util.
*;  
import java.util.concurrent.ConcurrentHashMap;
  
import java.util.concurrent.ConcurrentMap;
  
import java.util.concurrent.locks.ReentrantReadWriteLock;
  
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
  
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
  

  
import org.apache.commons.logging.Log;
  
import org.apache.commons.logging.LogFactory;
  
import org.apache.hadoop.classification.InterfaceAudience.Private;
  
import org.apache.hadoop.classification.InterfaceStability.Unstable;
  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.service.AbstractService;
  
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
  
import org.apache.hadoop.yarn.api.records.ApplicationId;
  
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
  
import org.apache.hadoop.yarn.api.records.Container;
  
import org.apache.hadoop.yarn.api.records.ContainerId;
  
import org.apache.hadoop.yarn.api.records.ContainerState;
  
import org.apache.hadoop.yarn.api.records.ContainerStatus;
  
import org.apache.hadoop.yarn.api.records.NodeId;
  
import org.apache.hadoop.yarn.api.records.Resource;
  
import org.apache.hadoop.yarn.api.records.ResourceOption;
  
import org.apache.hadoop.yarn.api.records.ResourceRequest;
  
import org.apache.hadoop.yarn.conf.YarnConfiguration;
  
import org.apache.hadoop.yarn.exceptions.YarnException;
  
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
  
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
  
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
  
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
  
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
  
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
  
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
  
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
  
import org.apache.hadoop.yarn.util.resource.Resources;
  

  
import com.google.common.util.concurrent.SettableFuture;
  

  

  
@SuppressWarnings(
"unchecked")  
@Private
  
@Unstable
  

public abstract><T extends SchedulerApplicationAttempt, N extends SchedulerNode>  extends AbstractService implements ResourceScheduler {
  

private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);  

// Nodes in the cluster, indexed by NodeId// 在集群中的节点,用NodeId索引  protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
  

  // Whole capacity of the cluster
  // 集群全部容量
  protected Resource clusterResource = Resource.newInstance(0, 0);
  

  protected Resource minimumAllocation;
  private Resource maximumAllocation;
  private Resource configuredMaximumAllocation;
  private int maxNodeMemory = -1;
  private int maxNodeVCores = -1;
  private final ReadLock maxAllocReadLock;
  private final WriteLock maxAllocWriteLock;
  

  private boolean useConfiguredMaximumAllocationOnly = true;
  private long configuredMaximumAllocationWaitTime;
  

  protected RMContext rmContext;
  /*
  * All schedulers which are inheriting AbstractYarnScheduler should use
  * concurrent version of 'applications' map.
  */
  // 所有继承自AbstractYarnScheduler的调度器应该使用并行版本的'applications' map
  protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
  protected int nmExpireInterval;
  

  protected final static List<Container> EMPTY_CONTAINER_LIST =
  new ArrayList<Container>();
  protected static final Allocation EMPTY_ALLOCATION = new Allocation(
  EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
  

  /**
  * Construct the service.
  *
  * @param name service name
  */
  /*
  * 构造服务
  * 参数name表示服务名
  */
  public AbstractYarnScheduler(String name) {
  super(name);
  ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  this.maxAllocReadLock = lock.readLock();
  this.maxAllocWriteLock = lock.writeLock();
  }
  

  // 服务所需的所有初始化代码。
  
  @Override
  public void serviceInit(Configuration conf) throws Exception {
  // getInt()表示获取<code> name </code>属性的值作为<code> int </code>。
  // 如果没有这样的属性,返回提供的默认值,或者如果指定的值不是有效的<code> int </ code>,那么会抛出一个错误。
  // 第一个参数是String name,第二个参数int defaultValue
  // DEFAULT_RM_NM_EXPIRY_INTERVAL_MS指节点管理器被认为死所要的等待的时间,默认为600000ms。
  nmExpireInterval =
  conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
  YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
  // 获取<code> name </code>属性的值作为<code> long </code>。
  // 如果没有这样的属性,返回所提供的默认值,或者如果指定的值不是有效的<code> long </ code>,则会抛出错误。
  // 第一个参数是String name,第二个参数long defaultValue
  // DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS指,默认为10000ms。
  configuredMaximumAllocationWaitTime =
  conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
  YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
  //
  
    createReleaseCache();
  super.serviceInit(conf);
  }
  

  public List<Container> getTransferredContainers(
  ApplicationAttemptId currentAttempt) {
  // 从<code>ApplicationAttempId</code>中获取<code>ApplicationId</code>
  ApplicationId appId = currentAttempt.getApplicationId();
  // 调用的get()函数是Map类,返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
  SchedulerApplication<T> app = applications.get(appId);
  // 构造一个初始容量为十的空列表。它只接收Container类型
  List<Container> containerList = new ArrayList<Container>();
  // rmContext是接口RMContext的对象,而该接口只有一个实现类RMContextImpl,
  // rmContext.getRMApps()返回ConcurrentMap<ApplicationId, RMApp>
  // rmContext.getRMApps().get(appId)调用的是Map类的get()函数。
  RMApp appImpl = this.rmContext.getRMApps().get(appId);
  // appImpl是接口RMApp对象,
  // appImpl.getApplicationSubmissionContext()此{@link RMApp}的应用程序提交上下文,返回ApplicationSubmissionContext
  // getUnmanagedAM()获取是否RM应该管理AM的执行。
  if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
  return containerList;
  }
  if (app == null) {
  return containerList;
  }
  // getLiveContainers()获取应用程序的活动容器并返回。
  Collection<RMContainer> liveContainers =
  app.getCurrentAppAttempt().getLiveContainers();
  // getMasterContainer()是供Application Master运行的容器,
  // Container类的getId()获取容器的全局唯一标识符。
  // 最后获取的是Application Master的容器Id
  ContainerId amContainerId =
  rmContext.getRMApps().get(appId).getCurrentAppAttempt()
  .getMasterContainer().getId();
  for (RMContainer rmContainer : liveContainers) {
  // 判断当前的Id是否是Application Master的容器Id
  if (!rmContainer.getContainerId().equals(amContainerId)) {
  // 不相等,则往容器列表中添加容器
  
        containerList.add(rmContainer.getContainer());
  }
  }
  return containerList;
  }
  

  public Map<ApplicationId, SchedulerApplication<T>>
  getSchedulerApplications() {
  return applications;
  }
  

  // 获取集群的整个资源容量。
  
  @Override
  public Resource getClusterResource() {
  return clusterResource;
  }
  

  // 获取最小可分配{@link Resource}。
  
  @Override
  public Resource getMinimumResourceCapability() {
  return minimumAllocation;
  }
  

  // 在集群级别获取最大可分配{@link Resource}。
  
  @Override
  public Resource getMaximumResourceCapability() {
  Resource maxResource;
  maxAllocReadLock.lock();
  try {
  // 类最开始定义useConfiguredMaximumAllocationOnly为true
  if (useConfiguredMaximumAllocationOnly) {
  // System.currentTimeMillis()产生一个当前的毫秒,这个毫秒其实就是自1970年1月1日0时起的毫秒数
  // ResourceManager.getClusterTimeStamp()调用的也是System.currentTimeMillis(),
  // configuredMaximumAllocationWaitTime默认值为10000ms
  if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
  > configuredMaximumAllocationWaitTime) {
  useConfiguredMaximumAllocationOnly = false;    //设为false
  
        }
  // 克隆一份资源
  maxResource = Resources.clone(configuredMaximumAllocation);
  } else {
  maxResource = Resources.clone(maximumAllocation);
  }
  } finally {
  maxAllocReadLock.unlock();
  }
  return maxResource;
  }
  

  //
  
  @Override
  public Resource getMaximumResourceCapability(String queueName) {
  return getMaximumResourceCapability();
  }
  

  // 初始化最大资源容量
  protected void initMaximumResourceCapability(Resource maximumAllocation) {
  maxAllocWriteLock.lock();
  try {
  if (this.configuredMaximumAllocation == null) {
  // 克隆资源
  this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
  this.maximumAllocation = Resources.clone(maximumAllocation);
  }
  } finally {
  maxAllocWriteLock.unlock();
  }
  }
  

  //
  
  protected synchronized void containerLaunchedOnNode(
  ContainerId containerId, SchedulerNode node) {
  // Get the application for the finished container
  // 获取完成了的容器的应用程序
  SchedulerApplicationAttempt application = getCurrentAttemptForContainer
  (containerId);
  if (application == null) {
  // getApplicationAttemptId()获取分配了<code> Container </code>的应用程序的<code> ApplicationAttemptId </code>。
  // getApplicationId() 获取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
  LOG.info("Unknown application "
  + containerId.getApplicationAttemptId().getApplicationId()
  + " launched container " + containerId + " on node: " + node);
  // rmContext是接口RMContext的对象, rmContext.getDispatcher()返回接口Dispatcher的对象,
  // rmContext.getDispatcher().getEventHandler()返回接口EventHandler对象, 最后调用EventHandler的handle()方法
  // RMNodeCleanContainerEvent表示资源管理器节点清除容器事件,构造函数内部有RMNodeEventType.CLEANUP_CONTAINER
  this.rmContext.getDispatcher().getEventHandler()
  .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
  return;
  }
  

  application.containerLaunchedOnNode(containerId, node.getNodeID());
  }
  

  //
  
  public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
  // getApplicationId() 获取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
  // 调用的get()函数是Map类,返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
  SchedulerApplication<T> app =
  applications.get(applicationAttemptId.getApplicationId());
  // getCurrentAppAttempt()返回的是SchedulerApplicationAttempt类对象
  return app == null ? null : app.getCurrentAppAttempt();
  }
  

  // 从给定应用程序尝试Id中获取调度器应用程序
  
  @Override
  public SchedulerAppReport getSchedulerAppInfo(
  ApplicationAttemptId appAttemptId) {
  SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
  if (attempt == null) {
  if (LOG.isDebugEnabled()) {
  LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
  }
  return null;
  }
  // SchedulerAppReport类 表示应用程序尝试,以及尝试使用的资源。
  return new SchedulerAppReport(attempt);
  }
  

  // 从给定的应用程序尝试ID获取资源使用情况报告。
  
  @Override
  public ApplicationResourceUsageReport getAppResourceUsageReport(
  ApplicationAttemptId appAttemptId) {
  SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
  if (attempt == null) {
  if (LOG.isDebugEnabled()) {
  LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
  }
  return null;
  }
  //
  
    return attempt.getResourceUsageReport();
  }
  

  // 根据容器Id获取当前应用程序的尝试
  public T getCurrentAttemptForContainer(ContainerId containerId) {
  return getApplicationAttempt(containerId.getApplicationAttemptId());
  }
  

  // 获取给定containerId的容器。
  
  @Override
  public RMContainer getRMContainer(ContainerId containerId) {
  SchedulerApplicationAttempt attempt =
  getCurrentAttemptForContainer(containerId);
  // getRMContainer()方法表示获取资源管理器容器
  return (attempt == null) ? null : attempt.getRMContainer(containerId);
  }
  

  // 获取节点资源使用情况报告。
  
  @Override
  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
  // Map类方法get()
  N node = nodes.get(nodeId);
  // SchedulerNodeReport类表示节点使用报告
  return node == null ? null : new SchedulerNodeReport(node);
  }
  

  // 将给定的应用程序移动到给定的队列
  
  @Override
  public String moveApplication(ApplicationId appId, String newQueue)
  throws YarnException {
  throw new YarnException(getClass().getSimpleName()
  + " does not support moving apps between queues");
  }
  

  // 移除一个已有的队列
  public void removeQueue(String queueName) throws YarnException {
  throw new YarnException(getClass().getSimpleName()
  + " does not support removing queues");
  }
  

  // 把一个新队列添加到调度器。
  
  @Override
  public void addQueue(Queue newQueue) throws YarnException {
  throw new YarnException(getClass().getSimpleName()
  + " does not support this operation");
  }
  

  // 此方法增加了当前队列的权限
  
  @Override
  public void setEntitlement(String queue, QueueEntitlement entitlement)
  throws YarnException {
  throw new YarnException(getClass().getSimpleName()
  + " does not support this operation");
  }
  

  //在节点上杀死孤立容器
  private void killOrphanContainerOnNode(RMNode node,
  NMContainerStatus container) {
  // getContainerState()获取容器的状态
  // Enum类的equals()函数表示 如果指定的对象等于此枚举常量,则返回true。否则false。
  // ContainerState类表示容器的状态,有三种NEW, RUNNING, COMPLETE。COMPLETE表示完成的容器。
  if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
  // 在本类的containerLaunchedOnNode()函数中有一样的,略
  this.rmContext.getDispatcher().getEventHandler().handle(
  new RMNodeCleanContainerEvent(node.getNodeID(),
  container.getContainerId()));
  }
  }
  

  // 在节点上恢复容器
  public synchronized void recoverContainersOnNode(
  List<NMContainerStatus> containerReports, RMNode nm) {
  if (!rmContext.isWorkPreservingRecoveryEnabled()
  || containerReports == null
  || (containerReports != null && containerReports.isEmpty())) {
  return;
  }
  

  for (NMContainerStatus container : containerReports) {
  /*
  * container.getContainerId()获取容器的<code> ContainerId </code>。
  * getApplicationAttemptId() 获取分配了<code> Container </code>的应用程序的<code> ApplicationAttemptId </code>。
  * getApplicationId() 获取<code> ApplicationAttempId </ code>的<code> ApplicationId </code>。
  */
  ApplicationId appId =
  container.getContainerId().getApplicationAttemptId().getApplicationId();
  //
  
      RMApp rmApp = rmContext.getRMApps().get(appId);
  if (rmApp == null) {
  LOG.error("Skip recovering container " + container
  + " for unknown application.");
  killOrphanContainerOnNode(nm, container);
  continue;
  }
  

  // Unmanaged AM recovery is addressed in YARN-1815
  // 未经管理的AM恢复在YARN-1815中得到解决
  // rmApp.getApplicationSubmissionContext()函数表示{@link RMApp}的应用程序提交上下文
  // getUnmanagedAM()获取是否RM应该管理AM的执行。如果为真,则RM不会为AM分配容器并启动它。
  if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
  LOG.info("Skip recovering container " + container + " for unmanaged AM."
  + rmApp.getApplicationId());
  killOrphanContainerOnNode(nm, container);
  continue;
  }
  

  //Map类的get()函数
  SchedulerApplication<T> schedulerApp = applications.get(appId);
  if (schedulerApp == null) {
  //rmApp.getState()表示{@link RMApp}的当前状态。
  LOG.info("Skip recovering container  " + container
  + " for unknown SchedulerApplication. Application current state is "
  + rmApp.getState());
  killOrphanContainerOnNode(nm, container);
  continue;
  }
  

  LOG.info("Recovering container " + container);
  SchedulerApplicationAttempt schedulerAttempt =
  schedulerApp.getCurrentAppAttempt();
  

  // getKeepContainersAcrossApplicationAttempts()函数 获取指示是否在应用程序尝试中保留容器的标志
  if (!rmApp.getApplicationSubmissionContext()
  .getKeepContainersAcrossApplicationAttempts()) {
  // Do not recover containers for stopped attempt or previous attempt.
  // 不要因为停止了的尝试或以前的尝试恢复容器。
  if (schedulerAttempt.isStopped()
  || !schedulerAttempt.getApplicationAttemptId().equals(
  container.getContainerId().getApplicationAttemptId())) {
  LOG.info("Skip recovering container " + container
  + " for already stopped attempt.");
  killOrphanContainerOnNode(nm, container);
  continue;
  }
  }
  

  // create container
  // 创建容器
  RMContainer rmContainer = recoverAndCreateContainer(container, nm);
  

  // recover RMContainer
  // 恢复 RMContainer
  rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
  container));
  

  // recover scheduler node
  // 恢复调度器节点
  nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
  

  // recover queue: update headroom etc.
  // 恢复队列:更新净空等等
  Queue queue = schedulerAttempt.getQueue();
  queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
  

  // recover scheduler attempt
  // 恢复调度器尝试
  
      schedulerAttempt.recoverContainer(rmContainer);
  // set master container for the current running AMContainer for this
  // attempt.
  // 为这个尝试 为当前运行的AMContainer设置主容器
  RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
  if (appAttempt != null) {
  // getMasterContainer()函数表示 ApplicationMaster运行在其上的容器
  Container masterContainer = appAttempt.getMasterContainer();
  

  // Mark current running AMContainer's RMContainer based on the master

  // container>  // 根据存储在AppAttempt中的主容器ID,标记当前正在运行的AMContainer的RMContainer。
  if (masterContainer != null
  && masterContainer.getId().equals(rmContainer.getContainerId())) {
  // 设置ApplicationMaster容器
  ((RMContainerImpl)rmContainer).setAMContainer(true);
  }
  }
  

  synchronized (schedulerAttempt) {
  // 这个pendingRelease用于工作维护恢复方案,以跟踪AM的未完成发布请求。
  // RM恢复可以收到AM的发布请求表,在此之前从NM收到容器状态以进行恢复。
  // 在这种情况下,由NM报告的待回收容器不应该被收回。

  Set<ContainerId>>  // Set类中的contains()函数,
  //如果此集合包含指定的元素,则返回<tt> true </ tt>。 更正式地,当且仅当该集合包含元素<tt> e </ tt>时,返回<tt> true </ tt>,
  //这样<tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>.
  if (releases.contains(container.getContainerId())) {

  //>  //释放容器
  rmContainer.handle(new RMContainerFinishedEvent(container
  .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
  container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
  RMContainerEventType.RELEASED));
  releases.remove(container.getContainerId());

  LOG.info(container.getContainerId() + " is>  }
  }
  }
  }
  

  // 恢复并创建容器
  // NMContainerStatus包括容器的当前信息。
  // RMNode类表示节点管理器有关可用资源和其他静态信息的信息。
  private RMContainer recoverAndCreateContainer(NMContainerStatus status,
  RMNode node) {
  // 创建Container实例
  Container container =
  Container.newInstance(status.getContainerId(), node.getNodeID(),
  node.getHttpAddress(), status.getAllocatedResource(),
  status.getPriority(), null);
  // 获取应用程序的尝试Id
  ApplicationAttemptId attemptId =
  container.getId().getApplicationAttemptId();
  // 创建一个RMContainerImpl对象                                                                     
  RMContainer rmContainer =
  new RMContainerImpl(container, attemptId, node.getNodeID(),
  applications.get(attemptId.getApplicationId()).getUser(), rmContext,
  status.getCreationTime());
  return rmContainer;
  }
  

  /**
  * Recover resource request back from RMContainer when a container is
  * preempted before AM pulled the same. If container is pulled by
  * AM, then RMContainer will not have resource request to recover.
  * @param rmContainer
  */
  /*
  * 在AM拉出相同之前当容器被抢占时,从RMContainer恢复资源请求。如果容器被AM拉过来,则RMContainer将不会有资源请求恢复。
  */
  protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
  // getResourceRequests()函数获取资源请求
  List<ResourceRequest> requests = rmContainer.getResourceRequests();
  

  // If container state is moved to ACQUIRED, request will be empty.
  // 如果容器状态被移动到 ACQUIRED,请求将为空。
  if (requests == null) {
  return;
  }
  // Add resource request back to Scheduler.
  // 将资源请求添加回调度器。
  
    SchedulerApplicationAttempt schedulerAttempt
  = getCurrentAttemptForContainer(rmContainer.getContainerId());
  if (schedulerAttempt != null) {
  // 恢复资源请求
  
      schedulerAttempt.recoverResourceRequests(requests);
  }
  }
  

  protected void createReleaseCache() {
  // Cleanup the cache after nm expire interval.
  // 在nm到期之际后清除缓存。
  // Timer类创建一个新的计时器。schedule()函数表示在指定的延迟之后安排指定的任务执行。
  new Timer().schedule(new TimerTask() {
  @Override
  public void run() {
  // Map类的values()函数表示 返回此map中包含的值的{@link Collection}视图。
  for (SchedulerApplication<T> app : applications.values()) {
  

  // 获取当前应用程序的尝试
  T attempt = app.getCurrentAppAttempt();
  synchronized (attempt) {
  //
  for (ContainerId containerId : attempt.getPendingRelease()) {
  // logFailure()函数表示 为失败的事件创建可读和可分析的审核日志字符串。
  
              RMAuditLogger.logFailure(
  app.getUser(),
  AuditConstants.RELEASE_CONTAINER,
  "Unauthorized access or invalid container",
  "Scheduler",

  "Trying to>  attempt.getApplicationId(), containerId);
  }
  // Set类的clear()函数表示 从此set中删除所有元素(可选操作)。 此调用返回后,该组将为空。
  
            attempt.getPendingRelease().clear();
  }
  }
  LOG.info("Release request cache is cleaned up");
  }
  }, nmExpireInterval);
  }
  

  // clean up a completed container
  // 清理完成的容器
  protected abstract void completedContainer(RMContainer rmContainer,
  ContainerStatus containerStatus, RMContainerEventType event);
  

  // 清除容器

  protected void>  SchedulerApplicationAttempt attempt) {
  for (ContainerId containerId : containers) {
  // 获取给定containerId的容器。
  RMContainer rmContainer = getRMContainer(containerId);
  if (rmContainer == null) {
  //
  
        if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
  < nmExpireInterval) {
  LOG.info(containerId + " doesn't exist. Add the container"

  + " to the>  synchronized (attempt) {
  // Set类的add()函数表示 如果指定的元素不存在,则将其指定的元素添加到这个set(可选操作)。
  // 更正式地,如果set不包含元素<tt> e2 </tt>,则将指定的元素<tt> e </tt>添加到此set,以便
  //<tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
  
            attempt.getPendingRelease().add(containerId);
  }
  } else {
  // logFailure()函数表示 为失败的事件创建可读和可分析的审核日志字符串
  
          RMAuditLogger.logFailure(attempt.getUser(),
  AuditConstants.RELEASE_CONTAINER,
  "Unauthorized access or invalid container", "Scheduler",

  "Trying to>  attempt.getApplicationId(), containerId);
  }
  }
  // 清理完成的容器
  // createAbnormalContainerStatus()函数表示在特殊情况下创建{@link ContainerStatus}的实用程序。
  
      completedContainer(rmContainer,
  SchedulerUtils.createAbnormalContainerStatus(containerId,
  SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
  }
  }
  

  // 获取
  // SchedulerNode类表示 从调度器的角度表示YARN集群节点。
  public SchedulerNode getSchedulerNode(NodeId nodeId) {
  // Map类的get()函数表示 返回指定键映射到的值,如果此映射不包含该键的映射,则返回{@code null}。
  return nodes.get(nodeId);
  }
  

  // 完全排除应用程序的sourceQueue,将其全部移动到destQueue。
  
  @Override
  public synchronized void moveAllApps(String sourceQueue, String destQueue)
  throws YarnException {
  // check if destination queue is a valid leaf queue
  // 检查目标队列是否是有效的叶队列
  try {
  getQueueInfo(destQueue, false, false);
  } catch (IOException e) {
  LOG.warn(e);
  throw new YarnException(e);
  }
  // check if source queue is a valid
  // 检查源队列是否有效
  // getAppsInQueue()函数表示 获取给定队列下的应用程序
  List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
  if (apps == null) {
  String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
  LOG.warn(errMsg);
  throw new YarnException(errMsg);
  }
  // generate move events for each pending/running app
  // 为每个待处理/正在运行的应用生成移动事件
  for (ApplicationAttemptId app : apps) {
  //
  SettableFuture<Object> future = SettableFuture.create();
  // RMAppMoveEvent类构造函数内部有 RMAppEventType.MOVE事件。
  this.rmContext
  .getDispatcher()
  .getEventHandler()
  .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
  }
  }
  // 终止指定队列中的所有应用程序。
  
  @Override
  public synchronized void killAllAppsInQueue(String queueName)
  throws YarnException {
  // check if queue is a valid
  // 检查队列是否有效
  // getAppsInQueue()函数表示 获取给定队列下的应用程序
  List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
  if (apps == null) {
  String errMsg = "The specified Queue: " + queueName + " doesn't exist";
  LOG.warn(errMsg);
  throw new YarnException(errMsg);
  }
  // generate kill events for each pending/running app
  // 为每个待处理/正在运行的应用生成kill事件
  for (ApplicationAttemptId app : apps) {
  this.rmContext
  .getDispatcher()
  .getEventHandler()
  .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
  "Application killed due to expiry of reservation queue " +
  queueName + "."));
  }
  }
  /**
  * Process resource update on a node.
  */
  // 在节点上处理资源更新。
  public synchronized void updateNodeResource(RMNode nm,
  ResourceOption resourceOption) {
  SchedulerNode node = getSchedulerNode(nm.getNodeID());
  Resource newResource = resourceOption.getResource();
  // 获取节点上的总资源。
  Resource oldResource = node.getTotalResource();
  if(!oldResource.equals(newResource)) {
  // Log resource change
  // 日志记录资源更改
  LOG.info("Update resource on node: " + node.getNodeName()
  + " from: " + oldResource + ", to: "
  + newResource);
  

  // Map类的remove()函数表示 从该map中删除一个键的映射,如果存在(可选的操作)。
  // 更正式地,如果该map包含从<tt> k </tt>到值<tt> v </tt>的映射,使得<code>(key==null ? k==null : key.equals(k))</code>,
  // 该映射被删除。(map最多可以包含一个这样的映射。)
  
      nodes.remove(nm.getNodeID());
  //
  
      updateMaximumAllocation(node, false);
  

  // update resource to node
  // 将资源更新到节点
  // 在节点上设置总资源。
  
      node.setTotalResource(newResource);
  

  // Map类的put()函数表示 将指定的值与该映射中的指定键相关联(可选操作)。如果map先前包含了键的映射,则旧值将被指定的值替换。
  
      nodes.put(nm.getNodeID(), (N)node);
  //
  updateMaximumAllocation(node, true);
  

  // update resource to clusterResource
  // 将资源更新到clusterResource
  // subtractFrom(clusterResource, oldResource)表示从clusterResource减去oldResource,资源包括内存和虚拟内核
  
      Resources.subtractFrom(clusterResource, oldResource);
  // addTo(clusterResource, newResource)表示在clusterResource添加newResource,资源包括内存和虚拟内核
  
      Resources.addTo(clusterResource, newResource);
  } else {
  // Log resource change
  // 日志记录资源改变
  LOG.warn("Update resource on node: " + node.getNodeName()
  + " with the same resource: " + newResource);
  }
  }
  

  /** {@inheritDoc} */
  // 返回调度时考虑的资源类型的集合
  
  @Override
  public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
  // EnumSet类的of()函数 创建一个最初包含指定元素的枚举集。
  return EnumSet.of(SchedulerResourceTypes.MEMORY);
  }
  

  // 获取由预留系统管理的队列的名称列表
  
  @Override
  public Set<String> getPlanQueues() throws YarnException {
  // Object类的getClass()函数 返回此{@code Object}的运行时类。

  // 返回的{@code>
  //>  throw new YarnException(getClass().getSimpleName()
  + " does not support reservations");
  }
  

  // 更新最大可分配
  protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
  // 获取节点上的总资源
  Resource totalResource = node.getTotalResource();
  maxAllocWriteLock.lock();
  try {
  if (add) { // added node  //添加节点
  // 获取资源的<em>memory</em>
  int nodeMemory = totalResource.getMemory();
  if (nodeMemory > maxNodeMemory) {
  maxNodeMemory = nodeMemory;
  // 设置资源的<em>memory</em>
  // Math.min()返回两个数的最小值
  
          maximumAllocation.setMemory(Math.min(
  configuredMaximumAllocation.getMemory(), maxNodeMemory));
  }
  // 获取资源的<em>number of virtual cpu cores</em>
  int nodeVCores = totalResource.getVirtualCores();
  if (nodeVCores > maxNodeVCores) {
  maxNodeVCores = nodeVCores;
  maximumAllocation.setVirtualCores(Math.min(
  configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
  }
  } else {  // removed node  //删除节点
  if (maxNodeMemory == totalResource.getMemory()) {
  maxNodeMemory = -1;
  }
  if (maxNodeVCores == totalResource.getVirtualCores()) {
  maxNodeVCores = -1;
  }
  // We only have to iterate through the nodes if the current max memory
  // or vcores was equal to the removed node's
  // 如果当前的最大内存或虚拟内核等于被删除的节点的,我们只需遍历节点
  if (maxNodeMemory == -1 || maxNodeVCores == -1) {
  // A map entry (key-value pair).  entrySet()返回此map中包含的映射的{@link Set}视图。
  for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
  int nodeMemory =
  nodeEntry.getValue().getTotalResource().getMemory();
  if (nodeMemory > maxNodeMemory) {
  maxNodeMemory = nodeMemory;
  }
  int nodeVCores =
  nodeEntry.getValue().getTotalResource().getVirtualCores();
  if (nodeVCores > maxNodeVCores) {
  maxNodeVCores = nodeVCores;
  }
  }
  if (maxNodeMemory == -1) {  // no nodes  //无节点
  
            maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
  } else {
  maximumAllocation.setMemory(
  Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
  }
  if (maxNodeVCores == -1) {  // no nodes   //无节点
  
            maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
  } else {
  maximumAllocation.setVirtualCores(
  Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
  }
  }
  }
  } finally {
  maxAllocWriteLock.unlock();
  }
  }
  

  // 刷新最大可分配
  protected void refreshMaximumAllocation(Resource newMaxAlloc) {
  maxAllocWriteLock.lock();
  try {
  configuredMaximumAllocation = Resources.clone(newMaxAlloc);
  int maxMemory = newMaxAlloc.getMemory();
  if (maxNodeMemory != -1) {
  maxMemory = Math.min(maxMemory, maxNodeMemory);
  }
  int maxVcores = newMaxAlloc.getVirtualCores();
  if (maxNodeVCores != -1) {
  maxVcores = Math.min(maxVcores, maxNodeVCores);
  }
  //
  maximumAllocation = Resources.createResource(maxMemory, maxVcores);
  } finally {
  maxAllocWriteLock.unlock();
  }
  }
  

  // 为应用尝试获取待处理的资源请求
  public List<ResourceRequest> getPendingResourceRequestsForAttempt(
  ApplicationAttemptId attemptId) {
  // 获取应用程序尝试
  SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId);
  if (attempt != null) {
  // getAppSchedulingInfo()获取应用程序调度信息。  getAllResourceRequests()获取所有的资源请求。
  return attempt.getAppSchedulingInfo().getAllResourceRequests();
  }
  return null;
  }
  
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425344-1-1.html 上篇帖子: Hadoop HDFS编程 API入门系列之HdfsUtil版本2(七) 下篇帖子: hadoop map中获取文件/切片名称
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表