【Hadoop】【Yarn】ResourceManager启动流程源码分析

举报
沙漠里的果果酱 发表于 2023/08/10 14:30:37 2023/08/10
【摘要】 【Hadoop】【Yarn】ResourceManager启动流程源码分析
首先放一张定位问题时候的错误堆栈,错误堆栈是最直接的调用信息,对分析源码有很大帮助。
尤其是Hadoop中广泛使用了多态。要找到某一个接口方法的调用实现到底来自于哪一个实现类,通常很困难。
而调用堆栈很清晰地展示了这些信息。这对于定位一些不熟悉的代码片段很有用。


ResourceManager切主流程
ActiveStandbyElector.processResult()
---ActiveStandbyElector.becomeActive()
------ActiveStandbyBasedElectorService.becomeActive()
---------AdminService.transitionToActive()
------------ResourceManager.transitionToActive()
---------------ResourceManager.startActiveServices()
------------------AbstractService.start()
---------------------ResourceManager : RMActiveServices.serviceStart()
------------------------CompositeService.serviceStart()
----------------------------AbstractService.start()
-------------------------------CommonNodeLabelsManager.serviceStart()
---------------------------------- CommonNodeLabelsManager.initNodeLabelStore()
-------------------------------------FileSystemNodeLabelsStore.recover()
----------------------------------------FileSystemNodeLabelsStore.loadFromMirror()
-------------------------------------------commonNodeLabelsManager.replaceLabelOnNode()
----------------------------------------------CommonNodeLabelsManager.checkReplaceLabelsOnNode()


    ResourceManager启动流程

    ResourceManager.main()
            //从上面的继承关系中可以看到ResourceManager其实一个是Service。既然是Service,那么他就会包含四个生命周期:uninited,init, start以及stop。
              其中ResourceManager启动的过程中主要有两个大动作:init和start,因此下面主要剖析这两个过程
ResourceManager.init() ---> AbstractService.init()
//1-加载各种配置文件core-default、core-site、yarn-default、yarn-site
//2-注册各种服务,有下面这些服务,这些服务之间有依赖关系,必须以这种顺序依次启动。
                  //rmDispatcher
                  //adminService 
                  // electorService 
                  // rmApplicationHitstoryWriter 
                  // timelineCollectorManager 
                  // systemMetricsPublisher
ResourceManager.serviceInit(conf)
                      //在RM初始化的过程中,创建并初始化RMActiveServices activeServices对象,该对象用来处理RM中维护的所有存活的Service。
                      //由于ResourceManager-》CompositeService-》AbstractService-》Service存在这样的依赖关系,所以显然RM也是一个Service,RM作为一个Service,它包含了很多自身相关的Service(CompositeService里面的serviceList对象进行维护),全部都交给activeService来处理。
                      createAndInitActiveServices(false);
                          RMActiveServices.init(conf)
                              RMActiveServices.serviceInit()//该方法中创建了一系列Resourcemanager需要的Service,主要包括下面几种:
                                        rmSecretManagerService = createRMSecretManagerService();
                                        EventMetricsManager eventMetricsManager = createEventMetricsManager();
                                        RMAsyncService rmAsyncService = createRMAsyncService();
                                        containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
                                        AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
                                        AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
                                        RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
                                        RMNodeLabelsManager nlm = createNodeLabelManager();
                                        AllocationTagsManager allocationTagsManager = createAllocationTagsManager();
                                        PlacementConstraintManagerService placementConstraintManager = createPlacementConstraintManager();
                                        ResourceProfilesManager resourceProfilesManager = createResourceProfileManager();
                                        RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater();
                                        nmLivelinessMonitor = createNMLivelinessMonitor();
                                        resourceTracker = createResourceTrackerService();
                                        JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
                                        masterService = createApplicationMasterService();
                                        clientRM = createClientRMService();
                                        applicationMasterLauncher = createAMLauncher();
                                        federationStateStoreService = createFederationStateStoreService();
                                        SystemServiceManager systemServiceManager = createServiceManager();                                            
ResourceManager.start() ---> AbstractService.start()
    ResourceManager.serviceStart()
        ResourceManager.startWebApp()
            ResourceManager.serviceStart()
                CompositeService.serviceStart()//启动RM在初始化Service过程中注册的所有Service


            ResourceManager.transitionToActive()
                ResourceManager.startActiveServices()//启动集群升主的ActiveService,该过程同上面的过程

上面简要罗列了ResourceManager.serviceInit()和ResourceManager.serviceStart()
protected void serviceInit(Configuration conf) throws Exception {
  this.conf = conf;
 /**
 * RMContextImpl class holds two services context.
 * serviceContext : These services called as <b>Always On</b> services.
 * Services that need to run always irrespective of the HA state of the RM.
 * activeServiceCotext : Active services context. Services that need to run
 * only on the Active RM.
 * Note: If any new service to be added to context, add it to a right
 * context as per above description.
 * 上面的注释说的很清楚了,RMContextImpl其实就是用来记录RM中service的信息,分为两种,其中activeServiceContext中记录了仅仅在主RM上面运行的Service
 */
  this.rmContext = new RMContextImpl();
  rmContext.setResourceManager(this);


  this.configurationProvider =
      ConfigurationProviderFactory.getConfigurationProvider(conf);
  this.configurationProvider.init(this.conf);
  rmContext.setConfigurationProvider(configurationProvider);//设置配置项解析类


  // load core-site.xml
  loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);//加载core-site.xml配置文件中的配置项值configuration对象中


  // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml
  // Or use RM specific configurations to overwrite the common ones first
  // if they exist
  RMServerUtils.processRMProxyUsersConf(conf);
  ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf);


  // load yarn-site.xml
  loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);//加载yarn-site.xml配置文件中的配置项到configuration对象中


  validateConfigs(this.conf);//验证配置项
  
  // Set HA configuration should be done before login
  this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
  if (this.rmContext.isHAEnabled()) {
    HAUtil.verifyAndSetConfiguration(this.conf);//校验HA模式下特有的必要配置项
  }


  // Set UGI and do login
  // If security is enabled, use login user
  // If security is not enabled, use current user
  this.rmLoginUGI = UserGroupInformation.getCurrentUser();
  try {
    doSecureLogin();
  } catch(IOException ie) {
    throw new YarnRuntimeException("Failed to login", ie);
  }


  // register the handlers for all AlwaysOn services using setupDispatcher().
  //创建一个名为:RM Event dispatcher的ResourceManager的事件处理器,所有的常驻service都是使用该事件处理器进行处理。
  rmDispatcher = setupDispatcher();
  addIfService(rmDispatcher);
  rmContext.setDispatcher(rmDispatcher);


  // The order of services below should not be changed as services will be
  // started in same order
  // As elector service needs admin service to be initialized and started,
  // first we add admin service then elector service


  adminService = createAdminService();
  addService(adminService);
  rmContext.setRMAdminService(adminService);


  // elector must be added post adminservice
  if (this.rmContext.isHAEnabled()) {
    // If the RM is configured to use an embedded leader elector,
    // initialize the leader elector.
    if (HAUtil.isAutomaticFailoverEnabled(conf)
        && HAUtil.isAutomaticFailoverEmbedded(conf)) {
      EmbeddedElector elector = createEmbeddedElector();
      addIfService(elector);
      rmContext.setLeaderElectorService(elector);
    }
  }


  rmContext.setYarnConfiguration(conf);
  //创建并初始化ActiveService,即上面rmContext中设置的service全部都是AlwaysOn service,即和当前ResourceManager HA状态没有关系的Service。而这里设置的都是猪ResourceManager中独有的service,下面将对createAndInitActiveServices()方法进行详细介绍
  createAndInitActiveServices(false);


  webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
                    YarnConfiguration.RM_BIND_HOST,
                    WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));//ResourceManager web服务器的地址,用来接收并处理rest请求以及Yarn原生页面的请求。


  RMApplicationHistoryWriter rmApplicationHistoryWriter =
      createRMApplicationHistoryWriter();
  addService(rmApplicationHistoryWriter);
  rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);//这里也是一个alwayson service,用来调度一个app历史写事件。然后上面提到的dispatcher会在使用特定的线程处理这个事件。


  // initialize the RM timeline collector first so that the system metrics
  // publisher can bind to it
  if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
    RMTimelineCollectorManager timelineCollectorManager =
        createRMTimelineCollectorManager();
    addService(timelineCollectorManager);
    rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
  }


  SystemMetricsPublisher systemMetricsPublisher =
      createSystemMetricsPublisher();
  addIfService(systemMetricsPublisher);
  rmContext.setSystemMetricsPublisher(systemMetricsPublisher);


  super.serviceInit(this.conf);
}

在上面的serviceInit方法中创建和出海刷ActiveService都是在createAndInitActiveService()方法中进行,下面将详细介绍该方法:
protected void createAndInitActiveServices(boolean fromActive) {
  activeServices = new RMActiveServices(this);
  activeServices.fromActive = fromActive;
  activeServices.init(conf);
}

可以看到创建完RMActiveServices对象之后,会调用init方法进行初始化。实际上RMActiveServices也是一个Service,基类的init方法如下,

最终还是会调用RMActiveServices.serviceInit()方法进行初始化。
@Override
protected void serviceInit(Configuration configuration) throws Exception {
  standByTransitionRunnable = new StandByTransitionRunnable();


  rmSecretManagerService = createRMSecretManagerService();
  addService(rmSecretManagerService);


  EventMetricsManager eventMetricsManager = createEventMetricsManager();
  rmContext.setEventMetricsManager(eventMetricsManager);


  RMAsyncService rmAsyncService = createRMAsyncService();
  addService(rmAsyncService);
  rmContext.setRmAsyncService(rmAsyncService);


  containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
  addService(containerAllocationExpirer);
  rmContext.setContainerAllocationExpirer(containerAllocationExpirer);


  AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
  addService(amLivelinessMonitor);
  rmContext.setAMLivelinessMonitor(amLivelinessMonitor);


  AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
  addService(amFinishingMonitor);
  rmContext.setAMFinishingMonitor(amFinishingMonitor);
  
  RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
  addService(rmAppLifetimeMonitor);
  rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);


  RMNodeLabelsManager nlm = createNodeLabelManager();
  nlm.setRMContext(rmContext);
  addService(nlm);
  rmContext.setNodeLabelManager(nlm);


  AllocationTagsManager allocationTagsManager =
      createAllocationTagsManager();
  rmContext.setAllocationTagsManager(allocationTagsManager);


  PlacementConstraintManagerService placementConstraintManager =
      createPlacementConstraintManager();
  addService(placementConstraintManager);
  rmContext.setPlacementConstraintManager(placementConstraintManager);


  // add resource profiles here because it's used by AbstractYarnScheduler
  ResourceProfilesManager resourceProfilesManager =
      createResourceProfileManager();
  resourceProfilesManager.init(conf);
  rmContext.setResourceProfilesManager(resourceProfilesManager);


  RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
      createRMDelegatedNodeLabelsUpdater();
  if (delegatedNodeLabelsUpdater != null) {
    addService(delegatedNodeLabelsUpdater);
    rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
  }


  recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
      YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);


  RMStateStore rmStore = null;
  if (recoveryEnabled) {
    rmStore = RMStateStoreFactory.getStore(conf);
    boolean isWorkPreservingRecoveryEnabled =
        conf.getBoolean(
          YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
          YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
    rmContext
        .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
  } else {
    rmStore = new NullRMStateStore();
  }


  try {
    rmStore.setResourceManager(rm);
    rmStore.init(conf);
    rmStore.setRMDispatcher(rmDispatcher);
  } catch (Exception e) {
    // the Exception from stateStore.init() needs to be handled for
    // HA and we need to give up master status if we got fenced
    LOG.error("Failed to init state store", e);
    throw e;
  }
  rmContext.setStateStore(rmStore);


  if (UserGroupInformation.isSecurityEnabled()) {
    delegationTokenRenewer = createDelegationTokenRenewer();
    rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
  }


  // Register event handler for NodesListManager
  nodesListManager = new NodesListManager(rmContext);
  rmDispatcher.register(NodesListManagerEventType.class, nodesListManager,
      rmContext.getEventMetricsManager()
          .getNodesListManagerEventTypeMetrics());
  addService(nodesListManager);
  rmContext.setNodesListManager(nodesListManager);


  // Initialize the scheduler
  scheduler = createScheduler();
  scheduler.setRMContext(rmContext);
  addIfService(scheduler);
  rmContext.setScheduler(scheduler);


  schedulerDispatcher = createSchedulerEventDispatcher();
  addIfService(schedulerDispatcher);


  rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher,
      rmContext.getEventMetricsManager().getRMSchedulerTypeMetrics());


  // Register event handler for RmAppEvents
  rmDispatcher.register(RMAppEventType.class,
      new ApplicationEventDispatcher(rmContext),
      rmContext.getEventMetricsManager().getAppEventTypeMetrics());


  // Register event handler for RmAppAttemptEvents
  rmDispatcher.register(RMAppAttemptEventType.class,
      new ApplicationAttemptEventDispatcher(rmContext),
      rmContext.getEventMetricsManager().getAttemptMetrics());


  // Register event handler for RmNodes
  rmDispatcher.register(RMNodeEventType.class,
      new NodeEventDispatcher(rmContext),
      rmContext.getEventMetricsManager().getNodeEventTypeMetrics());


  nmLivelinessMonitor = createNMLivelinessMonitor();
  addService(nmLivelinessMonitor);


  resourceTracker = createResourceTrackerService();
  addService(resourceTracker);
  rmContext.setResourceTrackerService(resourceTracker);


  MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager");
  if (fromActive) {
    JvmMetrics.reattach(ms, jvmMetrics);
    UserGroupInformation.reattachMetrics();
  } else {
    jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
  }


  JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
  addService(pauseMonitor);
  jvmMetrics.setPauseMonitor(pauseMonitor);


  // Initialize the Reservation system
  if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
      YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
    reservationSystem = createReservationSystem();
    if (reservationSystem != null) {
      reservationSystem.setRMContext(rmContext);
      addIfService(reservationSystem);
      rmContext.setReservationSystem(reservationSystem);
      LOG.info("Initialized Reservation system");
    }
  }


  masterService = createApplicationMasterService();
  createAndRegisterOpportunisticDispatcher(masterService);
  addService(masterService) ;
  rmContext.setApplicationMasterService(masterService);




  applicationACLsManager = new ApplicationACLsManager(conf);


  queueACLsManager = createQueueACLsManager(scheduler, conf);
  appRecoveryHandler = RMAppsHandler.getRMAppsHandler(rm);
  rmContext.setRMAppsHandler(appRecoveryHandler);
  rmAppManager = createRMAppManager();
  // Register event handler for RMAppManagerEvents
  rmDispatcher.register(RMAppManagerEventType.class, rmAppManager,
      rmContext.getEventMetricsManager().getAppManagerEventTypeMetrics());


  clientRM = createClientRMService();
  addService(clientRM);
  rmContext.setClientRMService(clientRM);


  ClusterMetrics.getMetrics();
  applicationMasterLauncher = createAMLauncher();
  rmDispatcher.register(AMLauncherEventType.class,
      applicationMasterLauncher);


  addService(applicationMasterLauncher);
  if (UserGroupInformation.isSecurityEnabled()) {
    addService(delegationTokenRenewer);
    delegationTokenRenewer.setRMContext(rmContext);
  }


  if(HAUtil.isFederationEnabled(conf)) {
    String cId = YarnConfiguration.getClusterId(conf);
    if (cId.isEmpty()) {
      String errMsg =
          "Cannot initialize RM as Federation is enabled"
              + " but cluster id is not configured.";
      LOG.error(errMsg);
      throw new YarnRuntimeException(errMsg);
    }
    federationStateStoreService = createFederationStateStoreService();
    addIfService(federationStateStoreService);
    LOG.info("Initialized Federation membership.");
  }


  rmnmInfo = new RMNMInfo(rmContext, scheduler);


  if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
      false)) {
    SystemServiceManager systemServiceManager = createServiceManager();
    addIfService(systemServiceManager);
  }


  super.serviceInit(conf);
}


最后调用RMActiveServices类的基类CompositeService的serviceInit()方法。然后在初始化阶段循环遍历,将他们一个个全部初始化。

上面就是ResourceManager初始化init的所有内容。ResourceManager在init的过程中会创建一系列Service,包括AlwaysOn和Active类型的。前者是常驻Service和ResourceMAnager的主备状态无关。后者是主ResourceManager特有的Service,例如token管理Service。
前面说过,一个Service的生命周期主要有uninited,init,start,stop四个阶段。ResourceManager作为一个Service,自然不能例外。上面介绍了前两个过程:uninited和init阶段。下面将详细介绍start阶段。


和init阶段一样,也是从main方法里面发起调用。最终依然会通过基类AbstractService.start()方法间接调用resourceManager.serviceStart()方法。如下:
protected void serviceStart() throws Exception {
  if (this.rmContext.isHAEnabled()) {
    //如果将当前的ResourceManager的状态转移为Standby状态,如果之前是Active状态,依然转移为Standby状态。
    transitionToStandby(false);
  }
  //启动Yarn Webapp
  startWepApp();
  //如果是最小化集群
  if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
      false)) {
    int port = webApp.port();
    WebAppUtils.setRMWebAppPort(conf, port);
  }
  //
  super.serviceStart();


  // Non HA case, start after RM services are started.
  //如果是非HA集群,那么将该Resourcemanager节点的状态转移到Active状态
  if (!this.rmContext.isHAEnabled()) {
    transitionToActive();
  }
}
最重要的一句就是super.serviceStart(),将会调用ResourceManager父类CompositeService.serviceStart()方法。代码如下:
protected void serviceStart() throws Exception {
  List<Service> services = getServices();
  if (LOG.isDebugEnabled()) {
    LOG.debug(getName() + ": starting services, size=" + services.size());
  }
  for (Service service : services) {
    // start the service. If this fails that service
    // will be stopped and an exception raised
    service.start();
  }
  super.serviceStart();
}

该方法做的事情其实非常简单就是,将上面提到的ResourceManager初始化阶段创建的所有的service启动。service信息就维护在CompositeService类中的成员变量serviceList中。


这里有些人可能会有一些疑问:刚在介绍ResourceManager初始化过程中,好像并没有直接操作serviceList对象啊。那么到底是在哪里将这些service加进去的。
该过程就是在CompositeService.addService()方法中进行的。代码如下:


ResourceManager就是调用父类CompositeService.addService方法将初始化阶段创建的Service全部放进去的。然后再Start阶段循环遍历,将他们一个个全部都启动起来。

下面以Capacity调度器为例说明,是怎么初始化和启动的。


我们可以从Capacity调度器的类声明中得到这样的继承关系:CapacityScheduler-》AbstractYarnScheduler-》AbstractService-》Service。
显然Capacity调度器也是一个Service,而且是在ResourceManager.RMActiveServices.serviceInit()的时候创建并初始化的。
上面说过,在ResourceManager初始化最后一步就是将所有的创建的Service全部初始化,CapacityScheduler就是其中一个Service(假设这里使用的是Capacity调度器)。
CapacityScheduler的初始化代码如下:
@Override
public void serviceInit(Configuration conf) throws Exception {
  Configuration configuration = new Configuration(conf);
  super.serviceInit(conf);
  initScheduler(configuration);
  // Initialize SchedulingMonitorManager
  schedulingMonitorManager.initialize(rmContext, conf);
}


CapacityScheduler的启动代码如下:
public void serviceStart() throws Exception {
  //启动调度器相关线程
  startSchedulerThreads();
  super.serviceStart();
}
//该方法主要功能为启动所有和container分配相关的线程。
private void startSchedulerThreads() {
  try {
    writeLock.lock();


    //ActivityManager用来用来存储节点和application申请资源的信息,主要包含了申请资源的start,add,update,finish操作。
    activitiesManager.start();
    if (scheduleAsynchronously) {
      Preconditions.checkNotNull(asyncSchedulerThreads,
          "asyncSchedulerThreads is null");


      //AsyncSchedulerThreads是用来分配container的线程。默认是一个,可以参数指定数量。
      for (Thread t : asyncSchedulerThreads) {
        t.start();
      }


      //资源申请提交服务线程
      resourceCommitterService.start();
    }
  } finally {
    writeLock.unlock();
  }
}
2.9.0/3.0.0 版本中Yarn在支持通过参数:yarn.scheduler.capacity.scheduler-asynchronously.maximum-threads来配置容器分配线程的数量。
<property>
 
   <name>yarn.scheduler.capacity.schedule-asynchronously.enable</name>
 
   <value>true</value>
 
</property>




In addition to that, the 2.9.0/3.0.0 Yarn support specify multiple thread (by 
d
default is 1) to allocate containers.








<property>
 
   <name>yarn.scheduler.capacity.schedule-asynchronously.maximum-threads</name>
 
   <value>4</value>
 
</property>


protected void serviceStart() throws Exception {
  //更新线程,
  if (updateThread != null) {
    updateThread.start();
  }
  schedulingMonitorManager.startAll();
  super.serviceStart();
}

这里面核心就两句:updateThread.start()和schedulingMonitorManager.startAll();
AbstractYarnScheduler.UpdateThread.runI()代码如下:
/**
 * Thread which calls {@link #update()} every
 * <code>updateInterval</code> milliseconds.
 */
private class UpdateThread extends Thread {
  @Override
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      try {
        synchronized (updateThreadMonitor) {
          //其中updateInterval的意思是该定时线程调度的时间间隔,每间隔updateInterval时间执行下面的update方法
          updateThreadMonitor.wait(updateInterval);
        }
        //锁定调度器和重新计算资源分配和需求,检查是否有抢占。默认的实现是空的,FairScheduler对其进行了重写。CapacityScheduler没有重写该方法。
        update();
      } catch (InterruptedException ie) {
        LOG.warn("Scheduler UpdateThread interrupted. Exiting.");
        return;
      } catch (Exception e) {
        LOG.error("Exception in scheduler UpdateThread", e);
      }
    }
  }
}


@VisibleForTesting
public void update() {
  // do nothing by default
}


SchedulingMonitorManager.startAll();
public synchronized void startAll() {
  for (SchedulingMonitor schedulingMonitor : runningSchedulingMonitors
      .values()) {
    schedulingMonitor.start();
  }
}




 Map<String, SchedulingMonitor> runningSchedulingMonitors
runningSchedulingMonitors.put(configuredPolicy, new SchedulingMonitor(rmContext, policyInstance));
@Override
public void serviceStart() throws Exception {
  LOG.info("Starting SchedulingMonitor=" + getName());
  assert !stopped : "starting when already stopped";
  ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setName(getName());
      return t;
    }
  });
  schedulePreemptionChecker();
  super.serviceStart();
}


private void schedulePreemptionChecker() {
  //启一个定时周期线程任务:PolicyInvoker
  handler = ses.scheduleAtFixedRate(new PolicyInvoker(),
      0, monitorInterval, TimeUnit.MILLISECONDS);
}

PolicyInvoker代码如下:


private class PolicyInvoker implements Runnable {
  @Override
  public void run() {
    try {
      if (monitorInterval != scheduleEditPolicy.getMonitoringInterval()) {
        handler.cancel(true);
        monitorInterval = scheduleEditPolicy.getMonitoringInterval();
        schedulePreemptionChecker();
      } else {
        invokePolicy();
      }
    } catch (Throwable t) {
      // The preemption monitor does not alter structures nor do structures
      // persist across invocations. Therefore, log, skip, and retry.
      LOG.error("Exception raised while executing preemption"
          + " checker, skip this run..., exception=", t);
    }
  }
}
//schedulerEditPolicy成员变量就是根据配置项:yarn.resourcemanager.scheduler.monitor.policies中配置的策略,反射得到相应的对象。默认为:
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy
private final SchedulingEditPolicy scheduleEditPolicy;


public void invokePolicy(){
   scheduleEditPolicy.editSchedule();
}

下面为默认配置的策略来说明是怎么进行抢占的。
public synchronized void editSchedule() {
  updateConfigIfNeeded();


  long startTs = clock.getTime();


  CSQueue root = scheduler.getRootQueue();
  Resource clusterResources = Resources.clone(scheduler.getClusterResource());
  //重点就是这一行,用来选出并跟踪需要抢占的所有候选container,等待这些container运行完成,如果超时未完成,则强制杀死。
  containerBasedPreemptOrKill(root, clusterResources);


  if (LOG.isDebugEnabled()) {
    LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
  }
}

该方法用来选择并跟踪作为候选被抢占的container。如果处于目标列表中的container超时未完成,那么将会被强行杀死。
private void containerBasedPreemptOrKill(CSQueue root,
    Resource clusterResources) {
  // Sync killable containers from scheduler when lazy preemption enabled
  if (lazyPreempionEnabled) {
    syncKillableContainersFromScheduler();
  }


  // All partitions to look at
  Set<String> partitions = new HashSet<>();
  partitions.addAll(scheduler.getRMContext()
      .getNodeLabelManager().getClusterNodeLabelNames());
  partitions.add(RMNodeLabelsManager.NO_LABEL);
  this.allPartitions = ImmutableSet.copyOf(partitions);


  // extract a summary of the queues from scheduler
  synchronized (scheduler) {
    queueToPartitions.clear();
    //----------------------------------------第一步--------------------------------------------------------
    //从Root队列开始递归拷贝子队列信息,该步骤的目的是为了生成所有队列的快照,防止集群队列信息变化导致资源抢占过程中出现问题。
    for (String partitionToLookAt : allPartitions) {
      cloneQueues(root, Resources
              .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)),
          partitionToLookAt);
    }


    // Update effective priority of queues
  }


  this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames(
      getQueueByPartition(CapacitySchedulerConfiguration.ROOT,
          RMNodeLabelsManager.NO_LABEL)));
  // compute total preemption allowed
  Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
      percentageClusterPreemptionAllowed);


  //clear under served queues for every run
  partitionToUnderServedQueues.clear();


  // based on ideal allocation select containers to be preemptionCandidates from each
  // queue and each application
  Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
      new HashMap<>();
  Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
      Set<RMContainer>>> toPreemptPerSelector =  new HashMap<>();;
  for (PreemptionCandidatesSelector selector :
      candidatesSelectionPolicies) {
    long startTime = 0;
    if (LOG.isDebugEnabled()) {
      LOG.debug(MessageFormat
          .format("Trying to use {0} to select preemption candidates",
              selector.getClass().getName()));
      startTime = clock.getTime();
    }
    Map<ApplicationAttemptId, Set<RMContainer>> curCandidates =
        selector.selectCandidates(toPreempt, clusterResources,
            totalPreemptionAllowed);
    toPreemptPerSelector.putIfAbsent(selector, curCandidates);


    if (LOG.isDebugEnabled()) {
      LOG.debug(MessageFormat
          .format("{0} uses {1} millisecond to run",
              selector.getClass().getName(), clock.getTime() - startTime));
      int totalSelected = 0;
      int curSelected = 0;
      for (Set<RMContainer> set : toPreempt.values()) {
        totalSelected += set.size();
      }
      for (Set<RMContainer> set : curCandidates.values()) {
        curSelected += set.size();
      }
      LOG.debug(MessageFormat
          .format("So far, total {0} containers selected to be preempted, {1}"
                  + " containers selected this round\n",
              totalSelected, curSelected));
    }
  }


  if (LOG.isDebugEnabled()) {
    logToCSV(new ArrayList<>(leafQueueNames));
  }


  // if we are in observeOnly mode return before any action is taken
  if (observeOnly) {
    return;
  }


  // TODO: need consider revert killable containers when no more demandings.
  // Since we could have several selectors to make decisions concurrently.
  // So computed ideal-allocation varies between different selectors.
  //
  // We may need to "score" killable containers and revert the most preferred
  // containers. The bottom line is, we shouldn't preempt a queue which is already
  // below its guaranteed resource.


  long currentTime = clock.getTime();


  pcsMap = toPreemptPerSelector;


  // preempt (or kill) the selected containers
  preemptOrkillSelectedContainerAfterWait(toPreemptPerSelector, currentTime);


  // cleanup staled preemption candidates
  cleanupStaledPreemptionCandidates(currentTime);
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。