【Hadoop】【Yarn】ResourceManager启动流程源码分析
【摘要】 【Hadoop】【Yarn】ResourceManager启动流程源码分析
首先放一张定位问题时候的错误堆栈,错误堆栈是最直接的调用信息,对分析源码有很大帮助。
尤其是Hadoop中广泛使用了多态。要找到某一个接口方法的调用实现到底来自于哪一个实现类,通常很困难。
而调用堆栈很清晰地展示了这些信息。这对于定位一些不熟悉的代码片段很有用。
ResourceManager切主流程
ActiveStandbyElector.processResult()
---ActiveStandbyElector.becomeActive()
------ActiveStandbyBasedElectorService.becomeActive()
---------AdminService.transitionToActive()
------------ResourceManager.transitionToActive()
---------------ResourceManager.startActiveServices()
------------------AbstractService.start()
---------------------ResourceManager : RMActiveServices.serviceStart()
------------------------CompositeService.serviceStart()
----------------------------AbstractService.start()
-------------------------------CommonNodeLabelsManager.serviceStart()
---------------------------------- CommonNodeLabelsManager.initNodeLabelStore()
-------------------------------------FileSystemNodeLabelsStore.recover()
----------------------------------------FileSystemNodeLabelsStore.loadFromMirror()
-------------------------------------------commonNodeLabelsManager.replaceLabelOnNode()
----------------------------------------------CommonNodeLabelsManager.checkReplaceLabelsOnNode()
ResourceManager启动流程
ResourceManager.main()
//从上面的继承关系中可以看到ResourceManager其实一个是Service。既然是Service,那么他就会包含四个生命周期:uninited,init, start以及stop。
其中ResourceManager启动的过程中主要有两个大动作:init和start,因此下面主要剖析这两个过程
ResourceManager.init() ---> AbstractService.init()
//1-加载各种配置文件core-default、core-site、yarn-default、yarn-site
//2-注册各种服务,有下面这些服务,这些服务之间有依赖关系,必须以这种顺序依次启动。
//rmDispatcher
//adminService
// electorService
// rmApplicationHitstoryWriter
// timelineCollectorManager
// systemMetricsPublisher
ResourceManager.serviceInit(conf)
//在RM初始化的过程中,创建并初始化RMActiveServices activeServices对象,该对象用来处理RM中维护的所有存活的Service。
//由于ResourceManager-》CompositeService-》AbstractService-》Service存在这样的依赖关系,所以显然RM也是一个Service,RM作为一个Service,它包含了很多自身相关的Service(CompositeService里面的serviceList对象进行维护),全部都交给activeService来处理。
createAndInitActiveServices(false);
RMActiveServices.init(conf)
RMActiveServices.serviceInit()//该方法中创建了一系列Resourcemanager需要的Service,主要包括下面几种:
rmSecretManagerService = createRMSecretManagerService();
EventMetricsManager eventMetricsManager = createEventMetricsManager();
RMAsyncService rmAsyncService = createRMAsyncService();
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
RMNodeLabelsManager nlm = createNodeLabelManager();
AllocationTagsManager allocationTagsManager = createAllocationTagsManager();
PlacementConstraintManagerService placementConstraintManager = createPlacementConstraintManager();
ResourceProfilesManager resourceProfilesManager = createResourceProfileManager();
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater();
nmLivelinessMonitor = createNMLivelinessMonitor();
resourceTracker = createResourceTrackerService();
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
masterService = createApplicationMasterService();
clientRM = createClientRMService();
applicationMasterLauncher = createAMLauncher();
federationStateStoreService = createFederationStateStoreService();
SystemServiceManager systemServiceManager = createServiceManager();
ResourceManager.start() ---> AbstractService.start()
ResourceManager.serviceStart()
ResourceManager.startWebApp()
ResourceManager.serviceStart()
CompositeService.serviceStart()//启动RM在初始化Service过程中注册的所有Service
ResourceManager.transitionToActive()
ResourceManager.startActiveServices()//启动集群升主的ActiveService,该过程同上面的过程
上面简要罗列了ResourceManager.serviceInit()和ResourceManager.serviceStart()
protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
/**
* RMContextImpl class holds two services context.
* serviceContext : These services called as <b>Always On</b> services.
* Services that need to run always irrespective of the HA state of the RM.
* activeServiceCotext : Active services context. Services that need to run
* only on the Active RM.
* Note: If any new service to be added to context, add it to a right
* context as per above description.
* 上面的注释说的很清楚了,RMContextImpl其实就是用来记录RM中service的信息,分为两种,其中activeServiceContext中记录了仅仅在主RM上面运行的Service
*/
this.rmContext = new RMContextImpl();
rmContext.setResourceManager(this);
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
this.configurationProvider.init(this.conf);
rmContext.setConfigurationProvider(configurationProvider);//设置配置项解析类
// load core-site.xml
loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);//加载core-site.xml配置文件中的配置项值configuration对象中
// Do refreshSuperUserGroupsConfiguration with loaded core-site.xml
// Or use RM specific configurations to overwrite the common ones first
// if they exist
RMServerUtils.processRMProxyUsersConf(conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf);
// load yarn-site.xml
loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);//加载yarn-site.xml配置文件中的配置项到configuration对象中
validateConfigs(this.conf);//验证配置项
// Set HA configuration should be done before login
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf);//校验HA模式下特有的必要配置项
}
// Set UGI and do login
// If security is enabled, use login user
// If security is not enabled, use current user
this.rmLoginUGI = UserGroupInformation.getCurrentUser();
try {
doSecureLogin();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
}
// register the handlers for all AlwaysOn services using setupDispatcher().
//创建一个名为:RM Event dispatcher的ResourceManager的事件处理器,所有的常驻service都是使用该事件处理器进行处理。
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
// The order of services below should not be changed as services will be
// started in same order
// As elector service needs admin service to be initialized and started,
// first we add admin service then elector service
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);
// elector must be added post adminservice
if (this.rmContext.isHAEnabled()) {
// If the RM is configured to use an embedded leader elector,
// initialize the leader elector.
if (HAUtil.isAutomaticFailoverEnabled(conf)
&& HAUtil.isAutomaticFailoverEmbedded(conf)) {
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
}
rmContext.setYarnConfiguration(conf);
//创建并初始化ActiveService,即上面rmContext中设置的service全部都是AlwaysOn service,即和当前ResourceManager HA状态没有关系的Service。而这里设置的都是猪ResourceManager中独有的service,下面将对createAndInitActiveServices()方法进行详细介绍
createAndInitActiveServices(false);
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));//ResourceManager web服务器的地址,用来接收并处理rest请求以及Yarn原生页面的请求。
RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);//这里也是一个alwayson service,用来调度一个app历史写事件。然后上面提到的dispatcher会在使用特定的线程处理这个事件。
// initialize the RM timeline collector first so that the system metrics
// publisher can bind to it
if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) {
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
}
SystemMetricsPublisher systemMetricsPublisher =
createSystemMetricsPublisher();
addIfService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
super.serviceInit(this.conf);
}
在上面的serviceInit方法中创建和出海刷ActiveService都是在createAndInitActiveService()方法中进行,下面将详细介绍该方法:
protected void createAndInitActiveServices(boolean fromActive) {
activeServices = new RMActiveServices(this);
activeServices.fromActive = fromActive;
activeServices.init(conf);
}
可以看到创建完RMActiveServices对象之后,会调用init方法进行初始化。实际上RMActiveServices也是一个Service,基类的init方法如下,
最终还是会调用RMActiveServices.serviceInit()方法进行初始化。
@Override
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
EventMetricsManager eventMetricsManager = createEventMetricsManager();
rmContext.setEventMetricsManager(eventMetricsManager);
RMAsyncService rmAsyncService = createRMAsyncService();
addService(rmAsyncService);
rmContext.setRmAsyncService(rmAsyncService);
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
addService(rmAppLifetimeMonitor);
rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
rmContext.setNodeLabelManager(nlm);
AllocationTagsManager allocationTagsManager =
createAllocationTagsManager();
rmContext.setAllocationTagsManager(allocationTagsManager);
PlacementConstraintManagerService placementConstraintManager =
createPlacementConstraintManager();
addService(placementConstraintManager);
rmContext.setPlacementConstraintManager(placementConstraintManager);
// add resource profiles here because it's used by AbstractYarnScheduler
ResourceProfilesManager resourceProfilesManager =
createResourceProfileManager();
resourceProfilesManager.init(conf);
rmContext.setResourceProfilesManager(resourceProfilesManager);
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {
addService(delegatedNodeLabelsUpdater);
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
}
recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null;
if (recoveryEnabled) {
rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
rmStore = new NullRMStateStore();
}
try {
rmStore.setResourceManager(rm);
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to init state store", e);
throw e;
}
rmContext.setStateStore(rmStore);
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenRenewer = createDelegationTokenRenewer();
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager,
rmContext.getEventMetricsManager()
.getNodesListManagerEventTypeMetrics());
addService(nodesListManager);
rmContext.setNodesListManager(nodesListManager);
// Initialize the scheduler
scheduler = createScheduler();
scheduler.setRMContext(rmContext);
addIfService(scheduler);
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher,
rmContext.getEventMetricsManager().getRMSchedulerTypeMetrics());
// Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext),
rmContext.getEventMetricsManager().getAppEventTypeMetrics());
// Register event handler for RmAppAttemptEvents
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext),
rmContext.getEventMetricsManager().getAttemptMetrics());
// Register event handler for RmNodes
rmDispatcher.register(RMNodeEventType.class,
new NodeEventDispatcher(rmContext),
rmContext.getEventMetricsManager().getNodeEventTypeMetrics());
nmLivelinessMonitor = createNMLivelinessMonitor();
addService(nmLivelinessMonitor);
resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker);
MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager");
if (fromActive) {
JvmMetrics.reattach(ms, jvmMetrics);
UserGroupInformation.reattachMetrics();
} else {
jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
}
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jvmMetrics.setPauseMonitor(pauseMonitor);
// Initialize the Reservation system
if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
reservationSystem = createReservationSystem();
if (reservationSystem != null) {
reservationSystem.setRMContext(rmContext);
addIfService(reservationSystem);
rmContext.setReservationSystem(reservationSystem);
LOG.info("Initialized Reservation system");
}
}
masterService = createApplicationMasterService();
createAndRegisterOpportunisticDispatcher(masterService);
addService(masterService) ;
rmContext.setApplicationMasterService(masterService);
applicationACLsManager = new ApplicationACLsManager(conf);
queueACLsManager = createQueueACLsManager(scheduler, conf);
appRecoveryHandler = RMAppsHandler.getRMAppsHandler(rm);
rmContext.setRMAppsHandler(appRecoveryHandler);
rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager,
rmContext.getEventMetricsManager().getAppManagerEventTypeMetrics());
clientRM = createClientRMService();
addService(clientRM);
rmContext.setClientRMService(clientRM);
ClusterMetrics.getMetrics();
applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
applicationMasterLauncher);
addService(applicationMasterLauncher);
if (UserGroupInformation.isSecurityEnabled()) {
addService(delegationTokenRenewer);
delegationTokenRenewer.setRMContext(rmContext);
}
if(HAUtil.isFederationEnabled(conf)) {
String cId = YarnConfiguration.getClusterId(conf);
if (cId.isEmpty()) {
String errMsg =
"Cannot initialize RM as Federation is enabled"
+ " but cluster id is not configured.";
LOG.error(errMsg);
throw new YarnRuntimeException(errMsg);
}
federationStateStoreService = createFederationStateStoreService();
addIfService(federationStateStoreService);
LOG.info("Initialized Federation membership.");
}
rmnmInfo = new RMNMInfo(rmContext, scheduler);
if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
false)) {
SystemServiceManager systemServiceManager = createServiceManager();
addIfService(systemServiceManager);
}
super.serviceInit(conf);
}
最后调用RMActiveServices类的基类CompositeService的serviceInit()方法。然后在初始化阶段循环遍历,将他们一个个全部初始化。
上面就是ResourceManager初始化init的所有内容。ResourceManager在init的过程中会创建一系列Service,包括AlwaysOn和Active类型的。前者是常驻Service和ResourceMAnager的主备状态无关。后者是主ResourceManager特有的Service,例如token管理Service。
前面说过,一个Service的生命周期主要有uninited,init,start,stop四个阶段。ResourceManager作为一个Service,自然不能例外。上面介绍了前两个过程:uninited和init阶段。下面将详细介绍start阶段。
和init阶段一样,也是从main方法里面发起调用。最终依然会通过基类AbstractService.start()方法间接调用resourceManager.serviceStart()方法。如下:
protected void serviceStart() throws Exception {
if (this.rmContext.isHAEnabled()) {
//如果将当前的ResourceManager的状态转移为Standby状态,如果之前是Active状态,依然转移为Standby状态。
transitionToStandby(false);
}
//启动Yarn Webapp
startWepApp();
//如果是最小化集群
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}
//
super.serviceStart();
// Non HA case, start after RM services are started.
//如果是非HA集群,那么将该Resourcemanager节点的状态转移到Active状态
if (!this.rmContext.isHAEnabled()) {
transitionToActive();
}
}
最重要的一句就是super.serviceStart(),将会调用ResourceManager父类CompositeService.serviceStart()方法。代码如下:
protected void serviceStart() throws Exception {
List<Service> services = getServices();
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": starting services, size=" + services.size());
}
for (Service service : services) {
// start the service. If this fails that service
// will be stopped and an exception raised
service.start();
}
super.serviceStart();
}
该方法做的事情其实非常简单就是,将上面提到的ResourceManager初始化阶段创建的所有的service启动。service信息就维护在CompositeService类中的成员变量serviceList中。
这里有些人可能会有一些疑问:刚在介绍ResourceManager初始化过程中,好像并没有直接操作serviceList对象啊。那么到底是在哪里将这些service加进去的。
该过程就是在CompositeService.addService()方法中进行的。代码如下:
ResourceManager就是调用父类CompositeService.addService方法将初始化阶段创建的Service全部放进去的。然后再Start阶段循环遍历,将他们一个个全部都启动起来。
下面以Capacity调度器为例说明,是怎么初始化和启动的。
我们可以从Capacity调度器的类声明中得到这样的继承关系:CapacityScheduler-》AbstractYarnScheduler-》AbstractService-》Service。
显然Capacity调度器也是一个Service,而且是在ResourceManager.RMActiveServices.serviceInit()的时候创建并初始化的。
上面说过,在ResourceManager初始化最后一步就是将所有的创建的Service全部初始化,CapacityScheduler就是其中一个Service(假设这里使用的是Capacity调度器)。
CapacityScheduler的初始化代码如下:
@Override
public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
super.serviceInit(conf);
initScheduler(configuration);
// Initialize SchedulingMonitorManager
schedulingMonitorManager.initialize(rmContext, conf);
}
CapacityScheduler的启动代码如下:
public void serviceStart() throws Exception {
//启动调度器相关线程
startSchedulerThreads();
super.serviceStart();
}
//该方法主要功能为启动所有和container分配相关的线程。
private void startSchedulerThreads() {
try {
writeLock.lock();
//ActivityManager用来用来存储节点和application申请资源的信息,主要包含了申请资源的start,add,update,finish操作。
activitiesManager.start();
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
"asyncSchedulerThreads is null");
//AsyncSchedulerThreads是用来分配container的线程。默认是一个,可以参数指定数量。
for (Thread t : asyncSchedulerThreads) {
t.start();
}
//资源申请提交服务线程
resourceCommitterService.start();
}
} finally {
writeLock.unlock();
}
}
2.9.0/3.0.0 版本中Yarn在支持通过参数:yarn.scheduler.capacity.scheduler-asynchronously.maximum-threads来配置容器分配线程的数量。
<property>
<name>yarn.scheduler.capacity.schedule-asynchronously.enable</name>
<value>true</value>
</property>
In addition to that, the 2.9.0/3.0.0 Yarn support specify multiple thread (by
d
default is 1) to allocate containers.
<property>
<name>yarn.scheduler.capacity.schedule-asynchronously.maximum-threads</name>
<value>4</value>
</property>
protected void serviceStart() throws Exception {
//更新线程,
if (updateThread != null) {
updateThread.start();
}
schedulingMonitorManager.startAll();
super.serviceStart();
}
这里面核心就两句:updateThread.start()和schedulingMonitorManager.startAll();
AbstractYarnScheduler.UpdateThread.runI()代码如下:
/**
* Thread which calls {@link #update()} every
* <code>updateInterval</code> milliseconds.
*/
private class UpdateThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
synchronized (updateThreadMonitor) {
//其中updateInterval的意思是该定时线程调度的时间间隔,每间隔updateInterval时间执行下面的update方法
updateThreadMonitor.wait(updateInterval);
}
//锁定调度器和重新计算资源分配和需求,检查是否有抢占。默认的实现是空的,FairScheduler对其进行了重写。CapacityScheduler没有重写该方法。
update();
} catch (InterruptedException ie) {
LOG.warn("Scheduler UpdateThread interrupted. Exiting.");
return;
} catch (Exception e) {
LOG.error("Exception in scheduler UpdateThread", e);
}
}
}
}
@VisibleForTesting
public void update() {
// do nothing by default
}
SchedulingMonitorManager.startAll();
public synchronized void startAll() {
for (SchedulingMonitor schedulingMonitor : runningSchedulingMonitors
.values()) {
schedulingMonitor.start();
}
}
Map<String, SchedulingMonitor> runningSchedulingMonitors
runningSchedulingMonitors.put(configuredPolicy, new SchedulingMonitor(rmContext, policyInstance));
@Override
public void serviceStart() throws Exception {
LOG.info("Starting SchedulingMonitor=" + getName());
assert !stopped : "starting when already stopped";
ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(getName());
return t;
}
});
schedulePreemptionChecker();
super.serviceStart();
}
private void schedulePreemptionChecker() {
//启一个定时周期线程任务:PolicyInvoker
handler = ses.scheduleAtFixedRate(new PolicyInvoker(),
0, monitorInterval, TimeUnit.MILLISECONDS);
}
PolicyInvoker代码如下:
private class PolicyInvoker implements Runnable {
@Override
public void run() {
try {
if (monitorInterval != scheduleEditPolicy.getMonitoringInterval()) {
handler.cancel(true);
monitorInterval = scheduleEditPolicy.getMonitoringInterval();
schedulePreemptionChecker();
} else {
invokePolicy();
}
} catch (Throwable t) {
// The preemption monitor does not alter structures nor do structures
// persist across invocations. Therefore, log, skip, and retry.
LOG.error("Exception raised while executing preemption"
+ " checker, skip this run..., exception=", t);
}
}
}
//schedulerEditPolicy成员变量就是根据配置项:yarn.resourcemanager.scheduler.monitor.policies中配置的策略,反射得到相应的对象。默认为:
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy
private final SchedulingEditPolicy scheduleEditPolicy;
public void invokePolicy(){
scheduleEditPolicy.editSchedule();
}
下面为默认配置的策略来说明是怎么进行抢占的。
public synchronized void editSchedule() {
updateConfigIfNeeded();
long startTs = clock.getTime();
CSQueue root = scheduler.getRootQueue();
Resource clusterResources = Resources.clone(scheduler.getClusterResource());
//重点就是这一行,用来选出并跟踪需要抢占的所有候选container,等待这些container运行完成,如果超时未完成,则强制杀死。
containerBasedPreemptOrKill(root, clusterResources);
if (LOG.isDebugEnabled()) {
LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
}
}
该方法用来选择并跟踪作为候选被抢占的container。如果处于目标列表中的container超时未完成,那么将会被强行杀死。
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// Sync killable containers from scheduler when lazy preemption enabled
if (lazyPreempionEnabled) {
syncKillableContainersFromScheduler();
}
// All partitions to look at
Set<String> partitions = new HashSet<>();
partitions.addAll(scheduler.getRMContext()
.getNodeLabelManager().getClusterNodeLabelNames());
partitions.add(RMNodeLabelsManager.NO_LABEL);
this.allPartitions = ImmutableSet.copyOf(partitions);
// extract a summary of the queues from scheduler
synchronized (scheduler) {
queueToPartitions.clear();
//----------------------------------------第一步--------------------------------------------------------
//从Root队列开始递归拷贝子队列信息,该步骤的目的是为了生成所有队列的快照,防止集群队列信息变化导致资源抢占过程中出现问题。
for (String partitionToLookAt : allPartitions) {
cloneQueues(root, Resources
.clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)),
partitionToLookAt);
}
// Update effective priority of queues
}
this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames(
getQueueByPartition(CapacitySchedulerConfiguration.ROOT,
RMNodeLabelsManager.NO_LABEL)));
// compute total preemption allowed
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
percentageClusterPreemptionAllowed);
//clear under served queues for every run
partitionToUnderServedQueues.clear();
// based on ideal allocation select containers to be preemptionCandidates from each
// queue and each application
Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
new HashMap<>();
Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
Set<RMContainer>>> toPreemptPerSelector = new HashMap<>();;
for (PreemptionCandidatesSelector selector :
candidatesSelectionPolicies) {
long startTime = 0;
if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat
.format("Trying to use {0} to select preemption candidates",
selector.getClass().getName()));
startTime = clock.getTime();
}
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates =
selector.selectCandidates(toPreempt, clusterResources,
totalPreemptionAllowed);
toPreemptPerSelector.putIfAbsent(selector, curCandidates);
if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat
.format("{0} uses {1} millisecond to run",
selector.getClass().getName(), clock.getTime() - startTime));
int totalSelected = 0;
int curSelected = 0;
for (Set<RMContainer> set : toPreempt.values()) {
totalSelected += set.size();
}
for (Set<RMContainer> set : curCandidates.values()) {
curSelected += set.size();
}
LOG.debug(MessageFormat
.format("So far, total {0} containers selected to be preempted, {1}"
+ " containers selected this round\n",
totalSelected, curSelected));
}
}
if (LOG.isDebugEnabled()) {
logToCSV(new ArrayList<>(leafQueueNames));
}
// if we are in observeOnly mode return before any action is taken
if (observeOnly) {
return;
}
// TODO: need consider revert killable containers when no more demandings.
// Since we could have several selectors to make decisions concurrently.
// So computed ideal-allocation varies between different selectors.
//
// We may need to "score" killable containers and revert the most preferred
// containers. The bottom line is, we shouldn't preempt a queue which is already
// below its guaranteed resource.
long currentTime = clock.getTime();
pcsMap = toPreemptPerSelector;
// preempt (or kill) the selected containers
preemptOrkillSelectedContainerAfterWait(toPreemptPerSelector, currentTime);
// cleanup staled preemption candidates
cleanupStaledPreemptionCandidates(currentTime);
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)