【Hadoop】【Yarn】NodeManager资源本地化实现
我们已经知道资源本地化线程是一个单独的服务,Yarn里面最大的服务就是NodeManager。
NodeManager启动之后,会拉起一系列服务,被拉起的服务可能里面又嵌套了服务。资源本地化服务当然也不例外;
和之前一样,在介绍具体的服务之前,我们先梳理一下调用链,看下这个资源本地化服务是如何初始化以及被拉起来的;
NodeManager.serviceInit()
---NodeManager.createContainer()
------ContainerManagerImpl.ContainerManagerImpl()
---------ContainerManagerImpl.createResourceLocalizationService()
这里就到了资源本地化service里面了。
我们看下资源本地化服务的定义:
public class ResourceLocalizationService extends CompositeService
implements EventHandler<LocalizationEvent>, LocalizationProtocol {
显然这个服务也是一个事件处理器,用来处理状态机的事件;
那么到底什么类型的事件会调用到该事件处理器进行处理呢,在哪里定义的?
就在ContainerManagerImpl的构造方法中。
ResourceLocalizationService rsrcLocalizationSrvc = createResourceLocalizationService(exec, deletionContext, context, metrics);
addService(rsrcLocalizationSrvc);
dispatcher.register(LocalizationEventType.class, new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, nmMetricsPublisher));
Yarn里面的dispatcher只有一个,就是中央事件处理器,所有用到的地方都是通过某种方式传进来的。
通过将事件注册到中央处理器中,中央处理器接收到事件来决定交给谁去处理,然后调用相应的事件处理器的handle方法;
所以不难理解资源本地化服务的入口其实就是handle()方法
public void handle(LocalizationEvent event) {
// TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) {
case INIT_APPLICATION_RESOURCES:
handleInitApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
case LOCALIZE_CONTAINER_RESOURCES:
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break;
case CONTAINER_RESOURCES_LOCALIZED:
handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
break;
case CACHE_CLEANUP:
handleCacheCleanup();
break;
case CLEANUP_CONTAINER_RESOURCES:
handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
break;
case DESTROY_APPLICATION_RESOURCES:
handleDestroyApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
default:
throw new YarnRuntimeException("Unknown localization event: " + event);
}
}
可以看到这个事件处理器可以处理这6种事件类型。这里以初始化application资源为例进行说明。
这个事件一般是nodemanger节点第一次为这个application申请资源的时候。
private void handleInitApplicationResources(Application app) {
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName, null, dispatcher, true, super.getConfig(), stateStore, dirsHandler));
String appIdStr = app.getAppId().toString();
appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(), app.getAppId(), dispatcher, false, super.getConfig(), stateStore, dirsHandler));
// 1) Signal container init
// This is handled by the ApplicationImpl state machine and allows
// containers to proceed with launching.
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(app.getAppId()));
}
对于一个第一次申请资源的application而言,有两种类型的资源需要区分处理:
1-私有资源(即用户级别的资源,并不会和某一个application绑定,该用户下所有的application共有);
2-application级别的资源
当然无论是哪一个,都是需要
ResourceLocalizationService服务自己作为一个事件处理器,在初始化的时候又会初始化另一个事件处理器。
下面是ResourceLocalizationService.ServiceInit()相关的代码片段:
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
这里初始化了LocalizerTracker服务,并且将其注册到事件中央处理器中。那么中央处理器接收到LocalizerEventType的事件的时候就会去调用事件处理器:LocalizerTracker.handle()
localizerTracker.handle()
public void handle(LocalizerEvent event) {
String locId = event.getLocalizerId();
switch (event.getType()) {
case REQUEST_RESOURCE_LOCALIZATION:
// 0) find running localizer or start new thread
LocalizerResourceRequestEvent req =
(LocalizerResourceRequestEvent)event;
switch (req.getVisibility()) {
case PUBLIC:
publicLocalizer.addResource(req);
break;
case PRIVATE:
case APPLICATION:
synchronized (privLocalizers) {
LocalizerRunner localizer = privLocalizers.get(locId);
if (localizer != null && localizer.killContainerLocalizer.get()) {
privLocalizers.remove(locId);
localizer.interrupt();
localizer = null;
}
if (null == localizer) {
if (recentlyCleanedLocalizers.containsKey(locId)) {
break;
}
LOG.info("Created localizer for " + locId);
localizer = new LocalizerRunner(req.getContext(), locId, privLocalizerParallel);
privLocalizers.put(locId, localizer);
localizer.start();
}
localizer.addResource(req);
}
break;
}
break;
}
}
这里接收到一个时间之后就会启动一个LocalizerRunner线程。
localizerTracker.handle()
---localizerTracker.run()
public void run() {
Path nmPrivateCTokensPath = null;
Throwable exception = null;
try {
// Get nmPrivateDir
nmPrivateCTokensPath = dirsHandler.getLocalPathForWrite(NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName);
// 0) init queue, etc.
// 1) write credentials to private dir
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(new LocalizerStartContext.Builder()
.setNmPrivateContainerTokens(nmPrivateCTokensPath)
.setNmAddr(localizationServerAddress)
.setUser(context.getUser())
.setAppId(context.getContainerId()
.getApplicationAttemptId().getApplicationId().toString())
.setLocId(localizerId)
.setDirsHandler(dirsHandler)
.build());
} else {
throw new IOException("All disks failed. " + dirsHandler.getDisksHealthReport(false));
}
}
LocalizerTracker线程主要做两件事:
1-将container的token认证信息写到nodemanager本地的nmPrivate目录下面;
2-调用LinuxContainerExecutor.startLocalizer(),真正开始执行本地化过程;
startLolizer()方法中准备好相关的参数之后,最终会调用PrivilegedOperationExecutor.executePrivilegedOperation()
运行本地化服务有两种方式:
1-直接运行ContainerLocalizer.main()
2-
ContainerLocalizer.main()
---ContainerLocalozer.runLocalization()
------ContainerLocalizer.localizeFiles()
---------ResourceLocalizationService.heartbeat()
------------ResourceLocalizationService.LocalizerTracker.processHeartbeat()
---------------ResourceLocalizationService.localRunner.processHeartbeat()
------------------ResourceLocalizationService.localRunner.multipleDownloadPerContainerLocalizer()
---------------------ResourceLocalizationService.localRunner.findNextResource()
------------------------ResouceLocalizationService.getLocalResourcesTracker()
- 点赞
- 收藏
- 关注作者
评论(0)