【Hadoop】【Yarn】AM是如何启动的
我们知道任何一个Yarn作业的运行都是有两个核心的过程:
1-ResourceManager选在一个NodeManager节点运行AM container(作为一个Application的第一个container)
ResourceManager--NodeManager发起调用
2-AM跑起来之后为作业的task申请container
AppMaster---NodeManager发起调用
第一个过程在ResourceManager里面,这里涉及到一个重要的类:ApplicationMasterLauncher,
public class ApplicationMasterLauncher
extends AbstractService
implements EventHandler<AMLauncherEvent>
看定义,既是一个Service又是一个事件处理器,这个在Hadoop中很少见。
首先从Service的角度看下,这个Service有什么作用。我们知道Hadoop中的习惯操作一般都是将一个比较复杂的单独模块作为一个Service,里面一般是一个线程,当然也有例外,下面看下ApplicationMasterLauncher这个线程都做了什么事情?
其实就是运行了一个名为LauncherThread的线程,这个线程里面主要是处理AppMaster的事件。
public void run() {
while (!this.isInterrupted()) {
Runnable toLaunch;
try {
toLaunch = masterEvents.take();
launcherPool.execute(toLaunch);
} catch (InterruptedException e) {
LOG.warn(this.getClass().getName() + " interrupted. Returning.");
return;
}
}
}
这里的每一个事件都是一个线程,而这些事件是在ApplicationMasterLauncher接收到事件之后添加到队列中的。
在ResourceManager类中初始化方法中有下面的代码片段:
rmDispatcher.register(AMLauncherEventType.class, applicationMasterLauncher);
其实就是将AMLauncherEventType时间注册到中央事件处理器中。当这种类型的事件来了之后就会调用ApplicationMasterLauncher.handle()方法进行处理。
而AMLauncherEventType中只定义了两种关于AM的事件
public enum AMLauncherEventType {
LAUNCH,
CLEANUP
}
ApplicationMasterLauncher.handle()中对于该事件的处理方法如下:
ApplicationMasterLauncher作为一个事件处理器做的时间很简单,就是将收到的AM 创建和销毁请求添加到事件队列中。然后ApplicationMasterLauncher.LauncherThread线程就会对其进行处理。
那么问题来了这个事件是哪里触发的。
RMAppAttemptImpl中的状态机
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
RMAppImpl中的状态机
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
这段状态转定义的意思为:如果当前的APP状态为SUBMITTED,那么收到APP_ACCEPTED事件之后将会调用StartAppAttemptTransition事件处理器,并且将APP的状态修改为ACCEPTED
那么RMAppState.ACCEPTED事件是在哪里触发的?
答案是在调度器里面触发的,这里以Capacity调度器为例进行说明,其他调度器大同小异。
CapacityScheduler作为一个事件处理器接收SchedulerEvent类型的事件。在其handle方法中定义了各种事件的处理逻辑,我们这里关注的是APP_ADDED事件
接着会调用CapacityScheduler.addApplication()方法,这里面主要做了下面几件事情:
1-当前提交的队列是否达到了application处理上限;
2-提交的队列是不是合法(队列是不是存在以及是不是叶子队列)
3-如果队列不存在,查看该队列是否配置了自动创建功能:getQueuePrefix(queuePath)+auto-create-child-queue.enabled
对于配置了自动创建队列功能的队列,可以自动创建,这种场景主要是为了处理那种在用户不感知的情况下将某些队列删除的情况。
这个功能你是Capacity 17年合入的,
4-以上校验都通过之后就会将application真正提交到队列上面;
5-然后调用中央事件处理器的,触发一个RMAppEvent事件:
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
事件到达状态被中央处理器接收到之后找到状态机RMAppImpl
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
然后调用状态机的transition方法,RMAppImpl.transition()方法中调用了createAndStartNewAttempt()
private void createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
createNewAttempt();
handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
transferStateFromPreviousAttempt));
}
1-为该Application创建一个AppAttempt
2-触发一个RMAppStartAttemptEvent事件
RMAppAttempImpl状态机中定义了
.addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.ATTEMPT_ADDED,
new ScheduleTransition())
ScheduleTransition.transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event)
这里的状态转移函数接收的两个参数就是事件里面传进来的,我们回头再看下事件触发时候的传参:
最终在transition里面会调用RMAppAttemptImpl.storeAttempt()
private void storeAttempt() {
// store attempt data in a non-blocking manner to prevent dispatcher
// thread starvation and wait for state to be saved
LOG.info("Storing attempt: AppId: " +
getAppAttemptId().getApplicationId()
+ " AttemptId: " +
getAppAttemptId()
+ " MasterContainer: " + masterContainer);
rmContext.getStateStore().storeNewApplicationAttempt(this);
}
/**
* Non-blocking API
* ResourceManager services call this to store state on an application attempt
* This does not block the dispatcher threads
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/
public void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
RMAppAttemptMetrics attempMetrics = appAttempt.getRMAppAttemptMetrics();
AggregateAppResourceUsage resUsage =
attempMetrics.getAggregateAppResourceUsage();
ApplicationAttemptStateData attemptState =
ApplicationAttemptStateData.newInstance(
appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(),
credentials, appAttempt.getStartTime(),
resUsage.getResourceUsageSecondsMap(),
attempMetrics.getPreemptedResourceSecondsMap(),
attempMetrics.getTotalAllocatedContainers());
getRMStateStoreEventHandler().handle(new RMStateStoreAppAttemptEvent(attemptState));
private static class StoreAppAttemptTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppAttemptEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationAttemptStateData attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptState);
//触发一个ATTEMPT_NEW_SAVED事件
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
};
}
RMAppAttempImpl状态机
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
RMAppAttemptImpl
private static final class AttemptStoredTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
appAttempt.registerClientToken();
appAttempt.launchAttempt();
}
}
private void launchAttempt(){
launchAMStartTime = System.currentTimeMillis();
// Send event to launch the AM Container
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
}
最终在这里发出了LAUNCH事件。
ResourceManager的中央事件处理器将该AMLauncherEvent类型事件注册进去
rmDispatcher.register(AMLauncherEventType.class, applicationMasterLauncher);
最终在ApplicationMasterLauncher.handle()进行了处理。
那么问题来了,上面的事件对象是如何变成一个线程对象的?
ApplicationMasterLauncher.launcher()
private void launch(RMAppAttempt application) {
Runnable launcher = createRunnableLauncher(application,
AMLauncherEventType.LAUNCH);
masterEvents.add(launcher);
}
protected Runnable createRunnableLauncher(RMAppAttempt application,
AMLauncherEventType event) {
Runnable launcher =
new AMLauncher(context, application, event, getConfig());
return launcher;
}
AMLauncher.run()
public void run() {
switch (eventType) {
case LAUNCH:
try {
LOG.info("Launching master" + application.getAppAttemptId());
launch();
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
} catch(Exception ie) {
onAMLaunchFailed(masterContainer.getId(), ie);
}
break;
case CLEANUP:
try {
LOG.info("Cleaning master " + application.getAppAttemptId());
cleanup();
} catch(IOException ie) {
LOG.info("Error cleaning master ", ie);
} catch (YarnException e) {
StringBuilder sb = new StringBuilder("Container ");
sb.append(masterContainer.getId().toString());
sb.append(" is not handled by this NodeManager");
if (!e.getMessage().contains(sb.toString())) {
// Ignoring if container is already killed by Node Manager.
LOG.info("Error cleaning master ", e);
}
}
break;
default:
LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
break;
}
}
AMLauncher.launch()
---ContainerLauncherImpl.launch()
------ContainerLauncherImpl.startContainer()
---------ContainerLauncherImpl.startContainerInternal()
startContainerInternal()中启动Container的核心代码片段如下:
if (!context.getApplications().containsKey(applicationID)) {
// Create the application
// populate the flow context from the launch context if the timeline
// service v.2 is enabled
FlowContext flowContext =
getFlowContext(launchContext, applicationID);
Application application = new ApplicationImpl(dispatcher, user, flowContext, applicationID, credentials, context);
if (context.getApplications().putIfAbsent(applicationID, application) == null) {
metrics.runningApplication();
LogAggregationContext logAggregationContext = containerTokenIdentifier.getLogAggregationContext();
Map<ApplicationAccessType, String> appAcls = container.getLaunchContext().getApplicationACLs();
context.getNMStateStore().storeApplication(applicationID, buildAppProto(applicationID, user, credentials, appAcls, logAggregationContext, flowContext));
//触发一个ApplicationInitEvent事件
dispatcher.getEventHandler().handle(new ApplicationInitEvent(applicationID, appAcls, logAggregationContext));
}
}
ApplicationImpl中的状态机
// Transitions from NEW state
.addTransition(ApplicationState.NEW, ApplicationState.INITING, ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
public ApplicationInitEvent(ApplicationId appId, Map<ApplicationAccessType, String> acls, LogAggregationContext logAggregationContext) {
super(appId, ApplicationEventType.INIT_APPLICATION);
this.applicationACLs = acls;
this.logAggregationContext = logAggregationContext;
}
从ApplicationImpl中的状态机转移定义可以看到当收到ApplicationEventType.INT_APPLICATION事件的时候,将会调用AppInitTransition().transition()
状态转移函数中可能有两种结果:
1-转移到另一种状态;
2-调用事件处理器,处理事件;
static class AppInitTransition implements SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
app.applicationACLs = initEvent.getApplicationACLs();
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
// Inform the logAggregator
app.logAggregationContext = initEvent.getLogAggregationContext();
//触发一个LogHandlerAppStartedEvent事件
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, app.applicationACLs,
app.logAggregationContext, app.applicationLogInitedTimestamp));
}
}
那么这里发出的LogHandlerAppStartedEvent是在哪里处理的呢。
我们知道事件有可能会被状态机捕获从而发生状态转移,也可能在事件处理器中进行处理:
这个事件就是直接被事件处理器进行处理的,在ContainerManagerImpl的中央事件处理器中注册了LogHandlerEventType
public void serviceInit(Configuration conf) throws Exception {
logHandler = createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
// add the shared cache upload service (it will do nothing if the shared cache is disabled)
SharedCacheUploadService sharedCacheUploader = createSharedCacheUploaderService();
addService(sharedCacheUploader);
dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
createAMRMProxyService(conf);
super.serviceInit(conf);
recover();
}
具体的事件处理器就是这里的logHandler,logHandler具有多种实现:
protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
DiskSpaceBasedLogAggregationService spaceBasedAgg = new DiskSpaceBasedLogAggregationService(this.dispatcher, context);
addIfService(spaceBasedAgg);
return new LogAggregationService(this.dispatcher, context, deletionService, dirsHandler);
} else {
return new NonAggregatingLogHandler(this.dispatcher, deletionService, dirsHandler, context.getNMStateStore());
}
}
针对是否开启了日志归集功能,hadoop内置了两种不同类型的日志处理器来处理日志事件。
1-LogAggregationService:开启了日志归集功能
2-NonAggregationLogHandler:未开启日志归集功能
这里我们只讨论日志归集的场景,至此之后的调用链很清晰,如下:
LogAggregationService.handle()
//LogAggregationService就是NodeManager这边用来处理日志归集事件的服务,每次一个日志Application的APPLICATION_STARTED事件到了之后就会发起下面的调用。
//主要就是为当前的app创建一个日志归集线程AppLogAggregatorImpl,每一个application都有自己单独的日志处理线程互不影响。
---LogAggregationService.initApp()
------LogAggregationService.initAppAggregator()
---------AppLogAggregatorImpl.run()
------------AppLogAggregatorImpl.doAppLogAggregation()
---------------AppLogAggregatorImpl.uploadLogsForContainers()
------------------AppLogAggregatorImpl.ContainerLogAggregator.ContainerLogAggregator()
---------------------AppLogAggregatorImpl.ContainerLogAggregator.getRetentionContxt()
class ContainerLogAggregator {
private final AggregatedLogFormat.LogRetentionContext retentionContext;
private final ContainerId containerId;
private Set<String> uploadedFileMeta = new HashSet<String>();
public ContainerLogAggregator(ContainerId containerId) {
this.containerId = containerId;
this.retentionContext = getRetentionContext();
}
private AggregatedLogFormat.LogRetentionContext getRetentionContext() {
final long logRetentionSecs =
conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
return new AggregatedLogFormat.LogRetentionContext(
recoveredLogInitedTime, logRetentionSecs * 1000);
}
OK,上面就是一个application的ResourceManager那边如何经过一系列状态迁移,最终发送了一个AM Container启动请求给NodeManager,以及NodeManager是如何经历一系列的事件和状态迁移最终为一个application创建了日志归集线程。
下面就着重分下日志归集线程是怎么工作的。
1-判断当前日志归集线程监控的application开启了动态滚动功能
yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds
定义NodeManager唤醒并上传日志的间隔周期。设置为-1或0表示禁用滚动监控,应用任务结束后日志汇聚。收集周期最小可设定为3600秒,当设置为大于0秒且小于3600秒时,收集周期将使用3600秒。
默认该特性是关闭的。也就是说需要等到application运行结束之后才会去上传。
2-上传日志;
3-清理本地日志,当然如果打开了失败日志留存时间,那么就不能删除,有专门的失败日志清理线程会定时清理。
确定要上传之后, 接下来就是上传的逻辑:
private void uploadLogsForContainers(boolean appFinished)
throws LogAggregationDFSException {
//如果没有开启日志归集功能,或者对于一个未完成的作业本地已经要删除了,那么就直接返回。
if (this.logAggregationDisabled || (deletionInProgress && !appFinished)) {
return;
}
addCredentials();
// Create a set of Containers whose logs will be uploaded in this cycle.
// It includes:
// a) all containers in pendingContainers: those containers are finished
// and satisfy the ContainerLogAggregationPolicy.
// b) some set of running containers: For all the Running containers,
// we use exitCode of 0 to find those which satisfy the
// ContainerLogAggregationPolicy.
Set<ContainerId> pendingContainerInThisCycle = new HashSet<ContainerId>();
//pendingContainers是接收到stopcontainer事件之后就会将该container添加到这个队列中。这里将这些要归档的container全部拿出来赋值为pendingContainerInThisCycle
this.pendingContainers.drainTo(pendingContainerInThisCycle);
Set<ContainerId> finishedContainers = new HashSet<ContainerId>(pendingContainerInThisCycle);
if (this.context.getApplications().get(this.appId) != null) {
for (Container container : this.context.getApplications()
.get(this.appId).getContainers().values()) {
ContainerType containerType = container.getContainerTokenIdentifier().getContainerType();
//这里的container的exitcode=0表示一个正在运行的container,当前的pendingContainerInThisCycle里面存放的都是已经完成的,因此这里还要把正在运行的放进去。
if (shouldUploadLogs(new ContainerLogContext(container.getContainerId(), containerType, 0))) {
pendingContainerInThisCycle.add(container.getContainerId());
}
}
}
//上面的一堆操作就是将这个application里面所有的的container,进行过滤根据当前配置的归集策略决定要不要将其上传,
//需要注意的是,虽然application可能还在运行汇总,但是这里要上传的container肯定是已经运行结束的。
//即pendingContainers队列里面存储的肯定是已经完成的Container信息。
if (pendingContainerInThisCycle.isEmpty()) {
sendLogAggregationReport(true, "", appFinished);
return;
}
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
//本地日志删除任务,上传成功之后就要将其本地日志删除
List<DeletionTask> deletionTasks = new ArrayList<>();
Exception exc = null;
try {
try {
//这里的logAggregationFileController就是用来上传的
logAggregationFileController.initializeWriter(logControllerContext);
} catch (IOException e1) {
logAggregationSucceedInThisCycle = false;
LOG.error("Cannot create writer for app " + this.applicationId + ". Skip log upload this time. ", e1);
return;
}
//上面都是找出要上传哪些container,下面这段是对每一个 container构造专门的aggregation任务。将aggregator结果添加到containerLogAggregators中
boolean uploadedLogsInThisCycle = false;
for (ContainerId container : pendingContainerInThisCycle) {
ContainerLogAggregator aggregator = null;
if (containerLogAggregators.containsKey(container)) {
aggregator = containerLogAggregators.get(container);
} else {
aggregator = new ContainerLogAggregator(container);
containerLogAggregators.put(container, aggregator);
}
//每一个container又有很多目录,将这些要上传的目录全部添加到本轮上传列表中,并且将这些路径添加到删除任务中
//后面上传成功之后就会将这个本地文件删掉。
//真正执行上传操作的正是这一步。
Set<Path> uploadedFilePathsInThisCycle = aggregator.doContainerLogAggregation(logAggregationFileController, appFinished, finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
deletionTasks.add(new FileDeletionCallBackTask(delService, this.userUgi.getShortUserName(), null, uploadedFilePathsInThisCycleList, getCallBackTask()));
deletionInProgress = true;
}
// This container is finished, and all its logs have been uploaded,
// remove it from containerLogAggregators.
if (finishedContainers.contains(container)) {
containerLogAggregators.remove(container);
}
}
logControllerContext.setUploadedLogsInThisCycle(uploadedLogsInThisCycle);
logControllerContext.setLogUploadTimeStamp(System.currentTimeMillis());
logControllerContext.increLogAggregationTimes();
//上传操作的后置操作,上传完成之后会启动删除任务。
try {
this.logAggregationFileController.postWrite(logControllerContext);
} catch (Exception e) {
renameTemporaryLogFileFailed = true;
logAggregationSucceedInThisCycle = false;
exc = e;
}
} finally {
if (logAggregationSucceedInThisCycle) {
for (DeletionTask deletionTask : deletionTasks) {
delService.delete(deletionTask);
}
}
}
}
public Set<Path> doContainerLogAggregation(
LogAggregationFileController logAggregationFileController,
boolean appFinished, boolean containerFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
final LogKey logKey = new LogKey(containerId);
final LogValue logValue = new LogValue(dirsHandler.getLogDirsForRead(), containerId, userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta, retentionContext, appFinished, containerFinished);
try {
logAggregationFileController.write(logKey, logValue);
} catch (Exception e) {
LOG.error("Couldn't upload logs for " + containerId + ". Skipping this container.", e);
return new HashSet<Path>();
}
this.uploadedFileMeta.addAll(logValue.getCurrentUpLoadedFileMeta());
// if any of the previous uploaded logs have been deleted,
// we need to remove them from alreadyUploadedLogs
Iterable<String> mask = Iterables.filter(uploadedFileMeta, new Predicate<String>() {
@Override
public boolean apply(String next) {
return logValue.getAllExistingFilesMeta().contains(next);
}
});
this.uploadedFileMeta = Sets.newHashSet(mask);
// need to return files uploaded or older-than-retention clean up.
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
logValue.getObseleteRetentionLogFiles());
}
}
AggregatedLogDeletionService
LogAggregationFileControllerFactory.getFileControllerForRead()
LogAggregationIndexedFileController.initInternal()
LogAggregationTFileController
jhs定时删除日志模块;
<name>yarn.log-aggregation.file-formats</name>
<value>TFile</value>
yarn.log-aggregation.TFile.remote-app-log-dir
yarn.nodemanager.remote-app-log-dir
yarn.log-aggregation.TFile.remote-app-log-dir-suffix
yarn.nodemanager.remote-app-log-dir-suffix
log-aggregation.file-controller.%s.class
<name>yarn.log-aggregation.file-controller.TFile.class</name>
<value>org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController</value>
https://192.168.1.1:20026/Mapreduce/JobHistoryServer/44/jobhistory/logs/server-2110082001-0025:26009/container_e01_1639573148594_0005_01_000002/attempt_1639573148594_0005_m_000000_0/testuser
log-aggregation.retain-check-interval-seconds fi配置为一天检查一次
log-aggregation.retain-seconds
如果没有配置那么十分之一的时间检查一次。
- 点赞
- 收藏
- 关注作者
评论(0)