【Hadoop】【Yarn】NodeManager之Container是如何启动的

举报
沙漠里的果果酱 发表于 2023/08/10 11:18:28 2023/08/10
【摘要】 【Hadoop】【Yarn】NodeManager之Container是如何启动的

我们知道拉起Container有两种途径:
1-ResourceManager启动AM(AM作为一个app第一个特殊的Container);
AMLauncher.launcher()
2-AM拉起Container
以mapreduce框架为例:ContainerLauncherImpl.Container.run()

无论哪一种都是使用NodeManager RPC客户端给NodeManager发送一个startContainer请求,
StartContainersResponse response =  containerMgrProxy.startContainers(allRequests);
最终到达NodeManager服务端:
ContainerManagerImpl.startContainer()
---ContainerManagerImpl.startContainerInternal()
虽然都是Container,但是不同类型的container启动的方式不尽相同,有下面几种类型:
1-AM;
2-当前节点上第一个Container;
3-当前节点上非第一个Container;
以上的过程差异无非是要不要更新NodeManager中记录的APP状态,要不要初始化本地目录等。

最后的过程都是一样:
1-localization;
2-startContainer
3-cleanup


ContainerScheduler.handle()

public void handle(ContainerSchedulerEvent event) {
  switch (event.getType()) {
  case SCHEDULE_CONTAINER:
    scheduleContainer(event.getContainer());
    break;
  // NOTE: Is sent only after container state has changed to PAUSED...
  case CONTAINER_PAUSED:
  // NOTE: Is sent only after container state has changed to DONE...
  case CONTAINER_COMPLETED:
    onResourcesReclaimed(event.getContainer());
    break;
  case UPDATE_CONTAINER:
    if (event instanceof UpdateContainerSchedulerEvent) {
      onUpdateContainer((UpdateContainerSchedulerEvent) event);
    } else {
      LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
    }
    break;
  case SHED_QUEUED_CONTAINERS:
    shedQueuedOpportunisticContainers();
    break;
  case RECOVERY_COMPLETED:
    startPendingContainers(maxOppQueueLength <= 0);
    metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
        queuedGuaranteedContainers.size());
    break;
  default:
    LOG.error("Unknown event arrived at ContainerScheduler: "
        + event.toString());
  }
}

ContainerScheduler.schedulerContainer()

protected void scheduleContainer(Container container) {
  boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
      getExecutionType() == ExecutionType.GUARANTEED;

  // Given a guaranteed container, we enqueue it first and then try to start
  // as many queuing guaranteed containers as possible followed by queuing
  // opportunistic containers based on remaining resources available. If the
  // container still stays in the queue afterwards, we need to preempt just
  // enough number of opportunistic containers.
  if (isGuaranteedContainer) {
    enqueueContainer(container);

    // When opportunistic container not allowed (which is determined by
    // max-queue length of pending opportunistic containers <= 0), start
    // guaranteed containers without looking at available resources.
    boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
    startPendingContainers(forceStartGuaranteedContainers);

    // if the guaranteed container is queued, we need to preempt opportunistic
    // containers for make room for it
    if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
      reclaimOpportunisticContainerResources(container);
    }
  } else {
    // Given an opportunistic container, we first try to start as many queuing
    // guaranteed containers as possible followed by queuing opportunistic
    // containers based on remaining resource available, then enqueue the
    // opportunistic container. If the container is enqueued, we do another
    // pass to try to start the newly enqueued opportunistic container.
    startPendingContainers(false);
    boolean containerQueued = enqueueContainer(container);
    // container may not get queued because the max opportunistic container
    // queue length is reached. If so, there is no point doing another pass
    if (containerQueued) {
      startPendingContainers(false);
    }
  }
  metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
      queuedGuaranteedContainers.size());
}

ContainerScheduler.startPendingContainers()
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
  // Start guaranteed containers that are paused, if resources available.
  boolean resourcesAvailable = startContainers(
        queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
  // Start opportunistic containers, if resources available.
  if (resourcesAvailable) {
    startContainers(queuedOpportunisticContainers.values(), false);
  }
}

ContainerScheduler.startContainer()

private boolean startContainers(
    Collection<Container> containersToBeStarted, boolean force) {
  Iterator<Container> cIter = containersToBeStarted.iterator();
  boolean resourcesAvailable = true;
  while (cIter.hasNext() && resourcesAvailable) {
    Container container = cIter.next();
    if (tryStartContainer(container, force)) {
      cIter.remove();
    } else {
      resourcesAvailable = false;
    }
  }
  return resourcesAvailable;
}

在机会性容器调度场景下。这里的force是false,反之为true,即不用检查剩余资源到底能不能拉起container,直接强行启动。

private boolean tryStartContainer(Container container, boolean force) {
  boolean containerStarted = false;
  // call startContainer without checking available resource when force==true
  if (force || resourceAvailableToStartContainer(
      container)) {
    startContainer(container);
    containerStarted = true;
  }
  return containerStarted;
}

ContainerScheduler.startContainer()

private void startContainer(Container container) {
  LOG.info("Starting container [" + container.getContainerId()+ "]");
  // Skip to put into runningContainers and addUtilization when recover
  if (!runningContainers.containsKey(container.getContainerId())) {
    runningContainers.put(container.getContainerId(), container);
    this.utilizationTracker.addContainerResources(container);
  }
  if (container.getContainerTokenIdentifier().getExecutionType() ==
      ExecutionType.OPPORTUNISTIC) {
    this.metrics.startOpportunisticContainer(container.getResource());
  }
  container.sendLaunchEvent();
}


ContainerImpl.sendLaunchEvent()

public void sendLaunchEvent() {
  if (ContainerState.PAUSED == getContainerState()) {
    dispatcher.getEventHandler().handle(
        new ContainerResumeEvent(containerId,
            "Container Resumed as some resources freed up"));
  } else {
    ContainersLauncherEventType launcherEvent =
        ContainersLauncherEventType.LAUNCH_CONTAINER;
    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
      // try to recover a container that was previously launched
      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
    } else if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
      launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
    }

    containerLaunchStartTime = clock.getTime();
    dispatcher.getEventHandler().handle(
        new ContainersLauncherEvent(this, launcherEvent));
  }

}

根据状态机ContainerManagerImpl中注册的事件处理器,可以看到ContainerLauncherEventType事件会被ContainerLauncher处理。
事件注册部分代码如下:

ContainerManagerImpl.ContainerManagerImpl()

containersLauncher = createContainersLauncher(context, exec);
addService(containersLauncher);

dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class, createApplicationEventDispatcher());
dispatcher.register(LocalizationEventType.class, new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, nmMetricsPublisher));
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);


事件处理器ContainerLauncher中LAUNCH_CONTAINER事件处理逻辑如下:
ContainerLauncher.handle()

public void handle(ContainersLauncherEvent event) {
  // TODO: ContainersLauncher launches containers one by one!!
  Container container = event.getContainer();
  ContainerId containerId = container.getContainerId();
  switch (event.getType()) {
    case LAUNCH_CONTAINER:
      Application app =
        context.getApplications().get(
            containerId.getApplicationAttemptId().getApplicationId());

      ContainerLaunch launch =
          new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
            event.getContainer(), dirsHandler, containerManager);
      containerLauncher.submit(launch);
      running.put(containerId, launch);
      break;
}

ContainerLaunch.call()

public Integer call() {
  if (!validateContainerState()) {
    return 0;
  }

  final ContainerLaunchContext launchContext = container.getLaunchContext();
  ContainerId containerID = container.getContainerId();
  String containerIdStr = containerID.toString();
  final List<String> command = launchContext.getCommands();
  int ret = -1;

  Path containerLogDir;
  try {
    Map<Path, List<String>> localResources = getLocalizedResources();

    final String user = container.getUser();
    // /////////////////////////// Variable expansion
    // Before the container script gets written out.
    List<String> newCmds = new ArrayList<String>(command.size());
    String appIdStr = app.getAppId().toString();
    String relativeContainerLogDir = ContainerLaunch
        .getRelativeContainerLogDir(appIdStr, containerIdStr);
    containerLogDir =
        dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
    recordContainerLogDir(containerID, containerLogDir.toString());
    for (String str : command) {
      // TODO: Should we instead work via symlinks without this grammar?
      newCmds.add(expandEnvironment(str, containerLogDir));
    }
    launchContext.setCommands(newCmds);

    // The actual expansion of environment variables happens after calling
    // sanitizeEnv.  This allows variables specified in NM_ADMIN_USER_ENV
    // to reference user or container-defined variables.
    Map<String, String> environment = launchContext.getEnvironment();
    // /////////////////////////// End of variable expansion

    // Use this to track variables that are added to the environment by nm.
    LinkedHashSet<String> nmEnvVars = new LinkedHashSet<String>();

    FileContext lfs = FileContext.getLocalFSFileContext();

    Path nmPrivateContainerScriptPath = dirsHandler.getLocalPathForWrite(
        getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
            + CONTAINER_SCRIPT);
    Path nmPrivateTokensPath = dirsHandler.getLocalPathForWrite(
        getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
            + String.format(TOKEN_FILE_NAME_FMT, containerIdStr));
    Path nmPrivateKeystorePath = dirsHandler.getLocalPathForWrite(
        getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
            + KEYSTORE_FILE);
    Path nmPrivateTruststorePath = dirsHandler.getLocalPathForWrite(
        getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
            + TRUSTSTORE_FILE);
    Path nmPrivateClasspathJarDir = dirsHandler.getLocalPathForWrite(
        getContainerPrivateDir(appIdStr, containerIdStr));

    // Select the working directory for the container
    Path containerWorkDir = deriveContainerWorkDir();
    recordContainerWorkDir(containerID, containerWorkDir.toString());

    // Select a root dir for all csi volumes for the container
    Path csiVolumesRoot = deriveCsiVolumesRootDir();
    recordContainerCsiVolumesRootDir(containerID, csiVolumesRoot.toString());

    String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
    // pid file should be in nm private dir so that it is not
    // accessible by users
    pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
    List<String> localDirs = dirsHandler.getLocalDirs();
    List<String> localDirsForRead = dirsHandler.getLocalDirsForRead();
    List<String> logDirs = dirsHandler.getLogDirs();
    List<String> filecacheDirs = getNMFilecacheDirs(localDirsForRead);
    List<String> userLocalDirs = getUserLocalDirs(localDirs);
    List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
    List<String> containerLogDirs = getContainerLogDirs(logDirs);
    List<String> userFilecacheDirs = getUserFilecacheDirs(localDirsForRead);
    List<String> applicationLocalDirs = getApplicationLocalDirs(localDirs,
        appIdStr);

    if (!dirsHandler.areDisksHealthy()) {
      ret = ContainerExitStatus.DISKS_FAILED;
      throw new IOException("Most of the disks failed. "
          + dirsHandler.getDisksHealthReport(false));
    }
    List<Path> appDirs = new ArrayList<Path>(localDirs.size());
    for (String localDir : localDirs) {
      Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, user);
      Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
      appDirs.add(new Path(appsdir, appIdStr));
    }

    byte[] keystore = container.getCredentials().getSecretKey(
        AMSecretKeys.YARN_APPLICATION_AM_KEYSTORE);
    if (keystore != null) {
      try (DataOutputStream keystoreOutStream =
               lfs.create(nmPrivateKeystorePath,
                   EnumSet.of(CREATE, OVERWRITE))) {
        keystoreOutStream.write(keystore);
      }
    } else {
      nmPrivateKeystorePath = null;
    }
    byte[] truststore = container.getCredentials().getSecretKey(
        AMSecretKeys.YARN_APPLICATION_AM_TRUSTSTORE);
    if (truststore != null) {
      try (DataOutputStream truststoreOutStream =
               lfs.create(nmPrivateTruststorePath,
                   EnumSet.of(CREATE, OVERWRITE))) {
        truststoreOutStream.write(truststore);
      }
    } else {
      nmPrivateTruststorePath = null;
    }

    // Set the token location too.
    addToEnvMap(environment, nmEnvVars,
        ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
        new Path(containerWorkDir,
            FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());

    // /////////// Write out the container-script in the nmPrivate space.
    try (DataOutputStream containerScriptOutStream =
             lfs.create(nmPrivateContainerScriptPath,
                 EnumSet.of(CREATE, OVERWRITE))) {
      // Sanitize the container's environment
      sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
          containerLogDirs, localResources, nmPrivateClasspathJarDir,
          nmEnvVars);

      expandAllEnvironmentVars(environment, containerLogDir);

      // Add these if needed after expanding so we don't expand key values.
      if (keystore != null) {
        addKeystoreVars(environment, containerWorkDir);
      }
      if (truststore != null) {
        addTruststoreVars(environment, containerWorkDir);
      }

      prepareContainer(localResources, containerLocalDirs);

      // Write out the environment
      exec.writeLaunchEnv(containerScriptOutStream, environment,
          localResources, launchContext.getCommands(),
          containerLogDir, user, nmEnvVars);
    }
    // /////////// End of writing out container-script

    // /////////// Write out the container-tokens in the nmPrivate space.
    try (DataOutputStream tokensOutStream =
             lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
      Credentials creds = container.getCredentials();
      creds.writeTokenStorageToStream(tokensOutStream);
    }
    // /////////// End of writing out container-tokens

    ret = launchContainer(new ContainerStartContext.Builder()
        .setContainer(container)
        .setLocalizedResources(localResources)
        .setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
        .setNmPrivateTokensPath(nmPrivateTokensPath)
        .setNmPrivateKeystorePath(nmPrivateKeystorePath)
        .setNmPrivateTruststorePath(nmPrivateTruststorePath)
        .setUser(user)
        .setAppId(appIdStr)
        .setContainerWorkDir(containerWorkDir)
        .setContainerCsiVolumesRootDir(csiVolumesRoot)
        .setLocalDirs(localDirs)
        .setLogDirs(logDirs)
        .setFilecacheDirs(filecacheDirs)
        .setUserLocalDirs(userLocalDirs)
        .setContainerLocalDirs(containerLocalDirs)
        .setContainerLogDirs(containerLogDirs)
        .setUserFilecacheDirs(userFilecacheDirs)
        .setApplicationLocalDirs(applicationLocalDirs).build());
  } catch (ConfigurationException e) {
    LOG.error("Failed to launch container due to configuration error.", e);
    dispatcher.getEventHandler().handle(new ContainerExitEvent(
        containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
        e.getMessage()));
    // Mark the node as unhealthy
    context.getNodeStatusUpdater().reportException(e);
    return ret;
  } catch (Throwable e) {
    LOG.warn("Failed to launch container.", e);
    dispatcher.getEventHandler().handle(new ContainerExitEvent(
        containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
        e.getMessage()));
    return ret;
  } finally {
    setContainerCompletedStatus(ret);
  }

  handleContainerExitCode(ret, containerLogDir);
  return ret;
}
/**
 * Start a list of containers on this NodeManager.
 */
@Override
public StartContainersResponse startContainers(
    StartContainersRequest requests) throws YarnException, IOException {
  UserGroupInformation remoteUgi = getRemoteUgi();
  NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
  authorizeUser(remoteUgi, nmTokenIdentifier);
  List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
  Map<ContainerId, SerializedException> failedContainers =new HashMap<ContainerId, SerializedException>();
  // Synchronize with NodeStatusUpdaterImpl#registerWithRM
  // to avoid race condition during NM-RM resync (due to RM restart) while a
  // container is being started, in particular when the container has not yet
  // been added to the containers map in NMContext.
  synchronized (this.context) {
    for (StartContainerRequest request : requests.getStartContainerRequests()) {
      ContainerId containerId = null;
      try {
        if (request.getContainerToken() == null || request.getContainerToken().getIdentifier() == null) {
          throw new IOException(INVALID_CONTAINERTOKEN_MSG);
        }

        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
            .newContainerTokenIdentifier(request.getContainerToken());
        verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
            containerTokenIdentifier);
        containerId = containerTokenIdentifier.getContainerID();

        // Initialize the AMRMProxy service instance only if the container is of
        // type AM and if the AMRMProxy service is enabled
        if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
            .equals(ContainerType.APPLICATION_MASTER)) {
          forwardAMRequest(request, containerId);
        }
        performContainerPreStartChecks(nmTokenIdentifier, request,
            containerTokenIdentifier);
        startContainerInternal(containerTokenIdentifier, request);
        succeededContainers.add(containerId);
      } catch (YarnException e) {
        failedContainers.put(containerId, SerializedException.newInstance(e));
      } catch (InvalidToken ie) {
        failedContainers
            .put(containerId, SerializedException.newInstance(ie));
        throw ie;
      } catch (IOException e) {
        throw RPCUtil.getRemoteException(e);
      }
    }
    return StartContainersResponse
        .newInstance(getAuxServiceMetaData(), succeededContainers,
            failedContainers);
  }
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。