【Hadoop】【Yarn】NodeManager之Container是如何启动的
我们知道拉起Container有两种途径:
1-ResourceManager启动AM(AM作为一个app第一个特殊的Container);
AMLauncher.launcher()
2-AM拉起Container
以mapreduce框架为例:ContainerLauncherImpl.Container.run()
无论哪一种都是使用NodeManager RPC客户端给NodeManager发送一个startContainer请求,
StartContainersResponse response = containerMgrProxy.startContainers(allRequests);
最终到达NodeManager服务端:
ContainerManagerImpl.startContainer()
---ContainerManagerImpl.startContainerInternal()
虽然都是Container,但是不同类型的container启动的方式不尽相同,有下面几种类型:
1-AM;
2-当前节点上第一个Container;
3-当前节点上非第一个Container;
以上的过程差异无非是要不要更新NodeManager中记录的APP状态,要不要初始化本地目录等。
最后的过程都是一样:
1-localization;
2-startContainer
3-cleanup
ContainerScheduler.handle()
public void handle(ContainerSchedulerEvent event) {
switch (event.getType()) {
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
// NOTE: Is sent only after container state has changed to PAUSED...
case CONTAINER_PAUSED:
// NOTE: Is sent only after container state has changed to DONE...
case CONTAINER_COMPLETED:
onResourcesReclaimed(event.getContainer());
break;
case UPDATE_CONTAINER:
if (event instanceof UpdateContainerSchedulerEvent) {
onUpdateContainer((UpdateContainerSchedulerEvent) event);
} else {
LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
}
break;
case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers();
break;
case RECOVERY_COMPLETED:
startPendingContainers(maxOppQueueLength <= 0);
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
break;
default:
LOG.error("Unknown event arrived at ContainerScheduler: "
+ event.toString());
}
}
ContainerScheduler.schedulerContainer()
protected void scheduleContainer(Container container) {
boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
getExecutionType() == ExecutionType.GUARANTEED;
// Given a guaranteed container, we enqueue it first and then try to start
// as many queuing guaranteed containers as possible followed by queuing
// opportunistic containers based on remaining resources available. If the
// container still stays in the queue afterwards, we need to preempt just
// enough number of opportunistic containers.
if (isGuaranteedContainer) {
enqueueContainer(container);
// When opportunistic container not allowed (which is determined by
// max-queue length of pending opportunistic containers <= 0), start
// guaranteed containers without looking at available resources.
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
// if the guaranteed container is queued, we need to preempt opportunistic
// containers for make room for it
if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
reclaimOpportunisticContainerResources(container);
}
} else {
// Given an opportunistic container, we first try to start as many queuing
// guaranteed containers as possible followed by queuing opportunistic
// containers based on remaining resource available, then enqueue the
// opportunistic container. If the container is enqueued, we do another
// pass to try to start the newly enqueued opportunistic container.
startPendingContainers(false);
boolean containerQueued = enqueueContainer(container);
// container may not get queued because the max opportunistic container
// queue length is reached. If so, there is no point doing another pass
if (containerQueued) {
startPendingContainers(false);
}
}
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
}
ContainerScheduler.startPendingContainers()
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
// Start guaranteed containers that are paused, if resources available.
boolean resourcesAvailable = startContainers(
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
}
}
ContainerScheduler.startContainer()
private boolean startContainers(
Collection<Container> containersToBeStarted, boolean force) {
Iterator<Container> cIter = containersToBeStarted.iterator();
boolean resourcesAvailable = true;
while (cIter.hasNext() && resourcesAvailable) {
Container container = cIter.next();
if (tryStartContainer(container, force)) {
cIter.remove();
} else {
resourcesAvailable = false;
}
}
return resourcesAvailable;
}
在机会性容器调度场景下。这里的force是false,反之为true,即不用检查剩余资源到底能不能拉起container,直接强行启动。
private boolean tryStartContainer(Container container, boolean force) {
boolean containerStarted = false;
// call startContainer without checking available resource when force==true
if (force || resourceAvailableToStartContainer(
container)) {
startContainer(container);
containerStarted = true;
}
return containerStarted;
}
ContainerScheduler.startContainer()
private void startContainer(Container container) {
LOG.info("Starting container [" + container.getContainerId()+ "]");
// Skip to put into runningContainers and addUtilization when recover
if (!runningContainers.containsKey(container.getContainerId())) {
runningContainers.put(container.getContainerId(), container);
this.utilizationTracker.addContainerResources(container);
}
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.startOpportunisticContainer(container.getResource());
}
container.sendLaunchEvent();
}
ContainerImpl.sendLaunchEvent()
public void sendLaunchEvent() {
if (ContainerState.PAUSED == getContainerState()) {
dispatcher.getEventHandler().handle(
new ContainerResumeEvent(containerId,
"Container Resumed as some resources freed up"));
} else {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.LAUNCH_CONTAINER;
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
// try to recover a container that was previously launched
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
} else if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
}
containerLaunchStartTime = clock.getTime();
dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent));
}
}
根据状态机ContainerManagerImpl中注册的事件处理器,可以看到ContainerLauncherEventType事件会被ContainerLauncher处理。
事件注册部分代码如下:
ContainerManagerImpl.ContainerManagerImpl()
containersLauncher = createContainersLauncher(context, exec);
addService(containersLauncher);
dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class, createApplicationEventDispatcher());
dispatcher.register(LocalizationEventType.class, new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, nmMetricsPublisher));
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);
事件处理器ContainerLauncher中LAUNCH_CONTAINER事件处理逻辑如下:
ContainerLauncher.handle()
public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer();
ContainerId containerId = container.getContainerId();
switch (event.getType()) {
case LAUNCH_CONTAINER:
Application app =
context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
}
ContainerLaunch.call()
public Integer call() {
if (!validateContainerState()) {
return 0;
}
final ContainerLaunchContext launchContext = container.getLaunchContext();
ContainerId containerID = container.getContainerId();
String containerIdStr = containerID.toString();
final List<String> command = launchContext.getCommands();
int ret = -1;
Path containerLogDir;
try {
Map<Path, List<String>> localResources = getLocalizedResources();
final String user = container.getUser();
// /////////////////////////// Variable expansion
// Before the container script gets written out.
List<String> newCmds = new ArrayList<String>(command.size());
String appIdStr = app.getAppId().toString();
String relativeContainerLogDir = ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr);
containerLogDir =
dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
recordContainerLogDir(containerID, containerLogDir.toString());
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(expandEnvironment(str, containerLogDir));
}
launchContext.setCommands(newCmds);
// The actual expansion of environment variables happens after calling
// sanitizeEnv. This allows variables specified in NM_ADMIN_USER_ENV
// to reference user or container-defined variables.
Map<String, String> environment = launchContext.getEnvironment();
// /////////////////////////// End of variable expansion
// Use this to track variables that are added to the environment by nm.
LinkedHashSet<String> nmEnvVars = new LinkedHashSet<String>();
FileContext lfs = FileContext.getLocalFSFileContext();
Path nmPrivateContainerScriptPath = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ CONTAINER_SCRIPT);
Path nmPrivateTokensPath = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ String.format(TOKEN_FILE_NAME_FMT, containerIdStr));
Path nmPrivateKeystorePath = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ KEYSTORE_FILE);
Path nmPrivateTruststorePath = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ TRUSTSTORE_FILE);
Path nmPrivateClasspathJarDir = dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr));
// Select the working directory for the container
Path containerWorkDir = deriveContainerWorkDir();
recordContainerWorkDir(containerID, containerWorkDir.toString());
// Select a root dir for all csi volumes for the container
Path csiVolumesRoot = deriveCsiVolumesRootDir();
recordContainerCsiVolumesRootDir(containerID, csiVolumesRoot.toString());
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
// pid file should be in nm private dir so that it is not
// accessible by users
pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> localDirsForRead = dirsHandler.getLocalDirsForRead();
List<String> logDirs = dirsHandler.getLogDirs();
List<String> filecacheDirs = getNMFilecacheDirs(localDirsForRead);
List<String> userLocalDirs = getUserLocalDirs(localDirs);
List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
List<String> containerLogDirs = getContainerLogDirs(logDirs);
List<String> userFilecacheDirs = getUserFilecacheDirs(localDirsForRead);
List<String> applicationLocalDirs = getApplicationLocalDirs(localDirs,
appIdStr);
if (!dirsHandler.areDisksHealthy()) {
ret = ContainerExitStatus.DISKS_FAILED;
throw new IOException("Most of the disks failed. "
+ dirsHandler.getDisksHealthReport(false));
}
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
for (String localDir : localDirs) {
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
appDirs.add(new Path(appsdir, appIdStr));
}
byte[] keystore = container.getCredentials().getSecretKey(
AMSecretKeys.YARN_APPLICATION_AM_KEYSTORE);
if (keystore != null) {
try (DataOutputStream keystoreOutStream =
lfs.create(nmPrivateKeystorePath,
EnumSet.of(CREATE, OVERWRITE))) {
keystoreOutStream.write(keystore);
}
} else {
nmPrivateKeystorePath = null;
}
byte[] truststore = container.getCredentials().getSecretKey(
AMSecretKeys.YARN_APPLICATION_AM_TRUSTSTORE);
if (truststore != null) {
try (DataOutputStream truststoreOutStream =
lfs.create(nmPrivateTruststorePath,
EnumSet.of(CREATE, OVERWRITE))) {
truststoreOutStream.write(truststore);
}
} else {
nmPrivateTruststorePath = null;
}
// Set the token location too.
addToEnvMap(environment, nmEnvVars,
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
new Path(containerWorkDir,
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
// /////////// Write out the container-script in the nmPrivate space.
try (DataOutputStream containerScriptOutStream =
lfs.create(nmPrivateContainerScriptPath,
EnumSet.of(CREATE, OVERWRITE))) {
// Sanitize the container's environment
sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
containerLogDirs, localResources, nmPrivateClasspathJarDir,
nmEnvVars);
expandAllEnvironmentVars(environment, containerLogDir);
// Add these if needed after expanding so we don't expand key values.
if (keystore != null) {
addKeystoreVars(environment, containerWorkDir);
}
if (truststore != null) {
addTruststoreVars(environment, containerWorkDir);
}
prepareContainer(localResources, containerLocalDirs);
// Write out the environment
exec.writeLaunchEnv(containerScriptOutStream, environment,
localResources, launchContext.getCommands(),
containerLogDir, user, nmEnvVars);
}
// /////////// End of writing out container-script
// /////////// Write out the container-tokens in the nmPrivate space.
try (DataOutputStream tokensOutStream =
lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
Credentials creds = container.getCredentials();
creds.writeTokenStorageToStream(tokensOutStream);
}
// /////////// End of writing out container-tokens
ret = launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setLocalizedResources(localResources)
.setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
.setNmPrivateTokensPath(nmPrivateTokensPath)
.setNmPrivateKeystorePath(nmPrivateKeystorePath)
.setNmPrivateTruststorePath(nmPrivateTruststorePath)
.setUser(user)
.setAppId(appIdStr)
.setContainerWorkDir(containerWorkDir)
.setContainerCsiVolumesRootDir(csiVolumesRoot)
.setLocalDirs(localDirs)
.setLogDirs(logDirs)
.setFilecacheDirs(filecacheDirs)
.setUserLocalDirs(userLocalDirs)
.setContainerLocalDirs(containerLocalDirs)
.setContainerLogDirs(containerLogDirs)
.setUserFilecacheDirs(userFilecacheDirs)
.setApplicationLocalDirs(applicationLocalDirs).build());
} catch (ConfigurationException e) {
LOG.error("Failed to launch container due to configuration error.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
// Mark the node as unhealthy
context.getNodeStatusUpdater().reportException(e);
return ret;
} catch (Throwable e) {
LOG.warn("Failed to launch container.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
return ret;
} finally {
setContainerCompletedStatus(ret);
}
handleContainerExitCode(ret, containerLogDir);
return ret;
}
/**
* Start a list of containers on this NodeManager.
*/
@Override
public StartContainersResponse startContainers(
StartContainersRequest requests) throws YarnException, IOException {
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi, nmTokenIdentifier);
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =new HashMap<ContainerId, SerializedException>();
// Synchronize with NodeStatusUpdaterImpl#registerWithRM
// to avoid race condition during NM-RM resync (due to RM restart) while a
// container is being started, in particular when the container has not yet
// been added to the containers map in NMContext.
synchronized (this.context) {
for (StartContainerRequest request : requests.getStartContainerRequests()) {
ContainerId containerId = null;
try {
if (request.getContainerToken() == null || request.getContainerToken().getIdentifier() == null) {
throw new IOException(INVALID_CONTAINERTOKEN_MSG);
}
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(request.getContainerToken());
verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
containerTokenIdentifier);
containerId = containerTokenIdentifier.getContainerID();
// Initialize the AMRMProxy service instance only if the container is of
// type AM and if the AMRMProxy service is enabled
if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
.equals(ContainerType.APPLICATION_MASTER)) {
forwardAMRequest(request, containerId);
}
performContainerPreStartChecks(nmTokenIdentifier, request,
containerTokenIdentifier);
startContainerInternal(containerTokenIdentifier, request);
succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (InvalidToken ie) {
failedContainers
.put(containerId, SerializedException.newInstance(ie));
throw ie;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
return StartContainersResponse
.newInstance(getAuxServiceMetaData(), succeededContainers,
failedContainers);
}
}
- 点赞
- 收藏
- 关注作者
评论(0)