【Hadoop】【Yarn】NodeManager资源本地化实现

举报
沙漠里的果果酱 发表于 2023/08/10 11:32:54 2023/08/10
【摘要】 【Hadoop】【Yarn】NodeManager资源本地化实现

我们已经知道资源本地化线程是一个单独的服务,Yarn里面最大的服务就是NodeManager。
NodeManager启动之后,会拉起一系列服务,被拉起的服务可能里面又嵌套了服务。资源本地化服务当然也不例外;
和之前一样,在介绍具体的服务之前,我们先梳理一下调用链,看下这个资源本地化服务是如何初始化以及被拉起来的;

NodeManager.serviceInit()
---NodeManager.createContainer()
------ContainerManagerImpl.ContainerManagerImpl()
---------ContainerManagerImpl.createResourceLocalizationService()
这里就到了资源本地化service里面了。
我们看下资源本地化服务的定义:
public class ResourceLocalizationService extends CompositeService
    implements EventHandler<LocalizationEvent>, LocalizationProtocol {

显然这个服务也是一个事件处理器,用来处理状态机的事件;
那么到底什么类型的事件会调用到该事件处理器进行处理呢,在哪里定义的?
就在ContainerManagerImpl的构造方法中。

  ResourceLocalizationService rsrcLocalizationSrvc = createResourceLocalizationService(exec, deletionContext, context, metrics);
  addService(rsrcLocalizationSrvc);

  dispatcher.register(LocalizationEventType.class, new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, nmMetricsPublisher));


Yarn里面的dispatcher只有一个,就是中央事件处理器,所有用到的地方都是通过某种方式传进来的。
通过将事件注册到中央处理器中,中央处理器接收到事件来决定交给谁去处理,然后调用相应的事件处理器的handle方法;
所以不难理解资源本地化服务的入口其实就是handle()方法

public void handle(LocalizationEvent event) {
  // TODO: create log dir as $logdir/$user/$appId
  switch (event.getType()) {
  case INIT_APPLICATION_RESOURCES:
    handleInitApplicationResources(
        ((ApplicationLocalizationEvent)event).getApplication());
    break;
  case LOCALIZE_CONTAINER_RESOURCES:
    handleInitContainerResources((ContainerLocalizationRequestEvent) event);
    break;
  case CONTAINER_RESOURCES_LOCALIZED:
    handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
    break;
  case CACHE_CLEANUP:
    handleCacheCleanup();
    break;
  case CLEANUP_CONTAINER_RESOURCES:
    handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
    break;
  case DESTROY_APPLICATION_RESOURCES:
    handleDestroyApplicationResources(
        ((ApplicationLocalizationEvent)event).getApplication());
    break;
  default:
    throw new YarnRuntimeException("Unknown localization event: " + event);
  }
}


可以看到这个事件处理器可以处理这6种事件类型。这里以初始化application资源为例进行说明。
这个事件一般是nodemanger节点第一次为这个application申请资源的时候。

private void handleInitApplicationResources(Application app) {
  // 0) Create application tracking structs
  String userName = app.getUser();
  privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName, null, dispatcher, true, super.getConfig(), stateStore, dirsHandler));
  String appIdStr = app.getAppId().toString();
  appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(), app.getAppId(), dispatcher, false, super.getConfig(), stateStore, dirsHandler));
  // 1) Signal container init
  // This is handled by the ApplicationImpl state machine and allows
  // containers to proceed with launching.
  dispatcher.getEventHandler().handle(new ApplicationInitedEvent(app.getAppId()));
}

对于一个第一次申请资源的application而言,有两种类型的资源需要区分处理:
1-私有资源(即用户级别的资源,并不会和某一个application绑定,该用户下所有的application共有);
2-application级别的资源

当然无论是哪一个,都是需要

ResourceLocalizationService服务自己作为一个事件处理器,在初始化的时候又会初始化另一个事件处理器。
下面是ResourceLocalizationService.ServiceInit()相关的代码片段:

localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);

这里初始化了LocalizerTracker服务,并且将其注册到事件中央处理器中。那么中央处理器接收到LocalizerEventType的事件的时候就会去调用事件处理器:LocalizerTracker.handle()
localizerTracker.handle()

public void handle(LocalizerEvent event) {
  String locId = event.getLocalizerId();
  switch (event.getType()) {
  case REQUEST_RESOURCE_LOCALIZATION:
    // 0) find running localizer or start new thread
    LocalizerResourceRequestEvent req =
      (LocalizerResourceRequestEvent)event;
    switch (req.getVisibility()) {
    case PUBLIC:
      publicLocalizer.addResource(req);
      break;
    case PRIVATE:
    case APPLICATION:
      synchronized (privLocalizers) {
        LocalizerRunner localizer = privLocalizers.get(locId);
        if (localizer != null && localizer.killContainerLocalizer.get()) {
          privLocalizers.remove(locId);
          localizer.interrupt();
          localizer = null;
        }
        if (null == localizer) {
         
          if (recentlyCleanedLocalizers.containsKey(locId)) {
            break;
          }
          LOG.info("Created localizer for " + locId);
          localizer = new LocalizerRunner(req.getContext(), locId, privLocalizerParallel);
          privLocalizers.put(locId, localizer);
          localizer.start();
        }
        localizer.addResource(req);
      }
      break;
    }
    break;
  }
}

这里接收到一个时间之后就会启动一个LocalizerRunner线程。
localizerTracker.handle()
---localizerTracker.run()

public void run() {
  Path nmPrivateCTokensPath = null;
  Throwable exception = null;
  try {
    // Get nmPrivateDir
    nmPrivateCTokensPath = dirsHandler.getLocalPathForWrite(NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName);
    // 0) init queue, etc.
    // 1) write credentials to private dir
    writeCredentials(nmPrivateCTokensPath);
    // 2) exec initApplication and wait
    if (dirsHandler.areDisksHealthy()) {
      exec.startLocalizer(new LocalizerStartContext.Builder()
          .setNmPrivateContainerTokens(nmPrivateCTokensPath)
          .setNmAddr(localizationServerAddress)
          .setUser(context.getUser())
          .setAppId(context.getContainerId()
              .getApplicationAttemptId().getApplicationId().toString())
          .setLocId(localizerId)
          .setDirsHandler(dirsHandler)
          .build());
    } else {
      throw new IOException("All disks failed. " + dirsHandler.getDisksHealthReport(false));
    }
  }

LocalizerTracker线程主要做两件事:
1-将container的token认证信息写到nodemanager本地的nmPrivate目录下面;
2-调用LinuxContainerExecutor.startLocalizer(),真正开始执行本地化过程;

startLolizer()方法中准备好相关的参数之后,最终会调用PrivilegedOperationExecutor.executePrivilegedOperation()


运行本地化服务有两种方式:
1-直接运行ContainerLocalizer.main()
2-

ContainerLocalizer.main()
---ContainerLocalozer.runLocalization()
------ContainerLocalizer.localizeFiles()
---------ResourceLocalizationService.heartbeat()
------------ResourceLocalizationService.LocalizerTracker.processHeartbeat()
---------------ResourceLocalizationService.localRunner.processHeartbeat()
------------------ResourceLocalizationService.localRunner.multipleDownloadPerContainerLocalizer()
---------------------ResourceLocalizationService.localRunner.findNextResource()
------------------------ResouceLocalizationService.getLocalResourcesTracker()

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。