【Hadoop】【Mapreduce】MRAppMaster功能简介
MRAppMaster的作用
MRAppMaster是MapReduce的ApplicationMaster的实现,它使得Mapreduce计算框架可以运行在YARN上面。主要有下面的作用:
1-创建MApreduce作业;
2-向ResourceManager申请资源;
3-和NodeManager通信要求其启动Container;
4-监控作业的运行状态,当任务失败的时候重新启动任务等;
5-作业恢复;
事件驱动模型
Yarn使用了基于事件驱动的异步编程模型,它通过事件将各个组件联系起来,并且由一个中央事件调度器统一将各个事件分配给对应的事件处理器。在Yarn中每一个 组件都是一种事件处理器。
MRAppMaster是一个独立的进程:org.apache.hadoop.mapreduce.v2.app.MRAppMaster#main。由AppClient向Yarn申请Container之后启动。MRAppMaster启动之后,它们会以服务的形式注册到MRAppMaster的中央事件调度器AsyncDispatcher,并且告诉调度器它们处理的事件类型,这样,当出现某一种事件的时候,MRAppMaster会查询自身维护的<事件,事件处理器>映射表,并且将该事件分配给对应的事件处理器。
MRAppMaster各种角色功能
Dispatcher:事件调度器,使用的实现类为AsyncDispatcher。可以向AsyncDispatcher注册事件处理器,它内部有一个阻塞队列用来存放所有事件,由一个线程来通过事件的类型来匹配事件处理器,最终将事件交给相应的事件处理器来处理。它是整个MRAppMaster的引擎。
Job:代表整个MapReduce作业,使用的实现类为JobImpl。它内部有一个状态机,负责处理JobEventType类型的事件并改变Job的状态。
Task:代表一个Map任务或Reduce任务,Map任务使用的实现类为MapTaskImpl,Reduce任务使用的实现类为ReduceTaskImpl。它内部有一个状态机,负责处理TaskEventType类型的事件并改变Task的状态。
TaskAttempt:代表一次Task运行尝试,使用的实现类为TaskAttemptImpl,每个Task可能会有多次运行尝试。它内部有一个状态机,负责处理TaskAttemptEventType类型的事件并改变TaskAttempt的状态。
ContainerAllocator:容器资源分配器。在非Uber作业中采用心跳的方式向Yarn申请Container,并将Container分配给Task。使用的具体实现为RMContainerAllocator,负责处理ContainerAllocator.EventType类型的事件。
ContainerLauncher:容器启动器。负责准备Task启动的运行环境,并向NodeManager发送启动Container的命令。使用的具体实现为ContainerLauncherImpl,负责处理ContainerLauncher.EventType类型的事件。
TaskAttemptListener:Task运行监听器。当Container启动后,Task就开始运行了,Task通过心跳的方式向TaskAttemptListener汇报自身的运行状况。
Speculator:推测执行器。当一个MapReduce作业的某个Task运行速度明显慢于其他Task时,Speculator会为该Task启动一个备份Task,让它与原任务同时处理同一份数据,谁先计算完成则将谁的结果作为最终结果,并将另一个Task杀掉。该机制可有效防止那些“拖后腿”任务拖慢整个作业的执行进度。使用的具体实现为DefaultSpeculator,负责处理Speculator.EventType类型的事件。
JobHistoryEventHandler:Job运行日志记录器。负责对作业的各个事件记录日志,比如作业创建、作业开始运行、Task开始运行等等,这些日志会被写到HDFS的某个目录下,这对于作业恢复非常有用。当MRAppMaster出现故障时,Yarn会将其重新调度到另外一个节点上。为了避免重新计算,MRAppMaster首先会从HDFS上读取上次运行产生的日志,以恢复已经运行完成的任务,进而能够只运行尚未运行完成的任务。同时用户还可以通过这些日志了解作业的运行状态。
ClientService:MRAppMaster提供的客户端服务,AppClient可以通过该服务了解作业的运行状况和杀死作业。
MRAppMaster中的状态机
作业运行流程
下面详细分析下一个Mapreduce作业,从提交到运行结束的整个流程。
有这样的依赖关系
MRAppMaster
public static void main(String[] args) {
try {
mainStarted = true;
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr =
System.getenv(Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(Environment.NM_HOST.name());
String nodePortString = System.getenv(Environment.NM_PORT.name());
String nodeHttpPortString =
System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
validateInputParam(containerIdStr,
Environment.CONTAINER_ID.name());
validateInputParam(nodeHostString, Environment.NM_HOST.name());
validateInputParam(nodePortString, Environment.NM_PORT.name());
validateInputParam(nodeHttpPortString,
Environment.NM_HTTP_PORT.name());
validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
ContainerId containerId = ContainerId.fromString(containerIdStr);
ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
if (applicationAttemptId != null) {
CallerContext.setCurrent(new CallerContext.Builder(
"mr_appmaster_" + applicationAttemptId.toString()).build());
}
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime);
ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
MRWebAppUtil.initialize(conf);
// log the system properties
String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
String jobUserName = System
.getenv(ApplicationConstants.Environment.USER.name());
conf.set(MRJobConfig.USER_NAME, jobUserName);
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.error("Error starting MRAppMaster", t);
ExitUtil.terminate(1, t);
}
}
MRAppMaster
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final JobConf conf, String jobUserName) throws IOException,
InterruptedException {
UserGroupInformation.setConfiguration(conf);
// MAPREDUCE-6565: need to set configuration for SecurityUtil.
SecurityUtil.setConfiguration(conf);
// Security framework already loaded the tokens into current UGI, just use
// them
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
LOG.info("Executing with tokens: {}", credentials.getAllTokens());
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
// Now remove the AM->RM token so tasks don't have it
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
if(appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null;
}
});
}
AbstractService
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
LOG.debug("Service {} is started", getName());
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
MRAppMaster
public class MRAppMaster extends CompositeService {
public void start() {
...
job = createJob(getConfig());//创建Job
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask,ReduceTask
startJobs();//启动作业,这是后续一切动作的触发之源
...
}
protected Job createJob(Configuration conf) {
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
}
JobImpl会接收到.JOB_INIT事件,然后触发作业状态从NEW变为INITED,并触发函数InitTransition(),该函数会创建MapTask和 ReduceTask,
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
...
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
...
}
其中,createMapTasks函数实现如下:
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) {
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens,
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
}
}
作业启动
public class MRAppMaster extends CompositeService {
protected void startJobs() {
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
dispatcher.getEventHandler().handle(startJobEvent);
}
}
JobImpl会接收到.JOB_START事件,会触发作业状态从INITED变为RUNNING,并触发函数StartTransition(),进而触发Map Task和Reduce Task开始调:
public static class StartTransition
implements SingleArcTransition<JobImpl, JobEvent> {
public void transition(JobImpl job, JobEvent event) {
job.scheduleTasks(job.mapTasks);
job.scheduleTasks(job.reduceTasks);
}
}
这之后,所有Map Task和Reduce Task各自负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,则会创建一个运行实例TaskAttempt,如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例TaskAttempt,直到一个TaskAttempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。
- 点赞
- 收藏
- 关注作者
评论(0)