【Hadoop】【Mapreduce】MRAppMaster功能简介

举报
沙漠里的果果酱 发表于 2023/08/09 17:00:58 2023/08/09
【摘要】 【Hadoop】【Mapreduce】MRAppMaster功能简介

MRAppMaster的作用
MRAppMaster是MapReduce的ApplicationMaster的实现,它使得Mapreduce计算框架可以运行在YARN上面。主要有下面的作用:
1-创建MApreduce作业;
2-向ResourceManager申请资源;
3-和NodeManager通信要求其启动Container;
4-监控作业的运行状态,当任务失败的时候重新启动任务等;
5-作业恢复;


事件驱动模型
Yarn使用了基于事件驱动的异步编程模型,它通过事件将各个组件联系起来,并且由一个中央事件调度器统一将各个事件分配给对应的事件处理器。在Yarn中每一个 组件都是一种事件处理器。
MRAppMaster是一个独立的进程:org.apache.hadoop.mapreduce.v2.app.MRAppMaster#main。由AppClient向Yarn申请Container之后启动。MRAppMaster启动之后,它们会以服务的形式注册到MRAppMaster的中央事件调度器AsyncDispatcher,并且告诉调度器它们处理的事件类型,这样,当出现某一种事件的时候,MRAppMaster会查询自身维护的<事件,事件处理器>映射表,并且将该事件分配给对应的事件处理器。


MRAppMaster各种角色功能
Dispatcher:事件调度器,使用的实现类为AsyncDispatcher。可以向AsyncDispatcher注册事件处理器,它内部有一个阻塞队列用来存放所有事件,由一个线程来通过事件的类型来匹配事件处理器,最终将事件交给相应的事件处理器来处理。它是整个MRAppMaster的引擎。
Job:代表整个MapReduce作业,使用的实现类为JobImpl。它内部有一个状态机,负责处理JobEventType类型的事件并改变Job的状态。
Task:代表一个Map任务或Reduce任务,Map任务使用的实现类为MapTaskImpl,Reduce任务使用的实现类为ReduceTaskImpl。它内部有一个状态机,负责处理TaskEventType类型的事件并改变Task的状态。
TaskAttempt:代表一次Task运行尝试,使用的实现类为TaskAttemptImpl,每个Task可能会有多次运行尝试。它内部有一个状态机,负责处理TaskAttemptEventType类型的事件并改变TaskAttempt的状态。
ContainerAllocator:容器资源分配器。在非Uber作业中采用心跳的方式向Yarn申请Container,并将Container分配给Task。使用的具体实现为RMContainerAllocator,负责处理ContainerAllocator.EventType类型的事件。
ContainerLauncher:容器启动器。负责准备Task启动的运行环境,并向NodeManager发送启动Container的命令。使用的具体实现为ContainerLauncherImpl,负责处理ContainerLauncher.EventType类型的事件。
TaskAttemptListener:Task运行监听器。当Container启动后,Task就开始运行了,Task通过心跳的方式向TaskAttemptListener汇报自身的运行状况。
Speculator:推测执行器。当一个MapReduce作业的某个Task运行速度明显慢于其他Task时,Speculator会为该Task启动一个备份Task,让它与原任务同时处理同一份数据,谁先计算完成则将谁的结果作为最终结果,并将另一个Task杀掉。该机制可有效防止那些“拖后腿”任务拖慢整个作业的执行进度。使用的具体实现为DefaultSpeculator,负责处理Speculator.EventType类型的事件。
JobHistoryEventHandler:Job运行日志记录器。负责对作业的各个事件记录日志,比如作业创建、作业开始运行、Task开始运行等等,这些日志会被写到HDFS的某个目录下,这对于作业恢复非常有用。当MRAppMaster出现故障时,Yarn会将其重新调度到另外一个节点上。为了避免重新计算,MRAppMaster首先会从HDFS上读取上次运行产生的日志,以恢复已经运行完成的任务,进而能够只运行尚未运行完成的任务。同时用户还可以通过这些日志了解作业的运行状态。
ClientService:MRAppMaster提供的客户端服务,AppClient可以通过该服务了解作业的运行状况和杀死作业。

MRAppMaster中的状态机

作业运行流程
下面详细分析下一个Mapreduce作业,从提交到运行结束的整个流程。
  有这样的依赖关系

MRAppMaster

public static void main(String[] args) {
  try {
    mainStarted = true;
    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
    String containerIdStr =
        System.getenv(Environment.CONTAINER_ID.name());
    String nodeHostString = System.getenv(Environment.NM_HOST.name());
    String nodePortString = System.getenv(Environment.NM_PORT.name());
    String nodeHttpPortString =
        System.getenv(Environment.NM_HTTP_PORT.name());
    String appSubmitTimeStr =
        System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
    
    validateInputParam(containerIdStr,
        Environment.CONTAINER_ID.name());
    validateInputParam(nodeHostString, Environment.NM_HOST.name());
    validateInputParam(nodePortString, Environment.NM_PORT.name());
    validateInputParam(nodeHttpPortString,
        Environment.NM_HTTP_PORT.name());
    validateInputParam(appSubmitTimeStr,
        ApplicationConstants.APP_SUBMIT_TIME_ENV);

    ContainerId containerId = ContainerId.fromString(containerIdStr);
    ApplicationAttemptId applicationAttemptId =
        containerId.getApplicationAttemptId();
    if (applicationAttemptId != null) {
      CallerContext.setCurrent(new CallerContext.Builder(
          "mr_appmaster_" + applicationAttemptId.toString()).build());
    }
    long appSubmitTime = Long.parseLong(appSubmitTimeStr);
    MRAppMaster appMaster =
        new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
            Integer.parseInt(nodePortString),
            Integer.parseInt(nodeHttpPortString), appSubmitTime);
    ShutdownHookManager.get().addShutdownHook(
      new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
    JobConf conf = new JobConf(new YarnConfiguration());
    conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
    
    MRWebAppUtil.initialize(conf);
    // log the system properties
    String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
    if (systemPropsToLog != null) {
      LOG.info(systemPropsToLog);
    }

    String jobUserName = System
        .getenv(ApplicationConstants.Environment.USER.name());
    conf.set(MRJobConfig.USER_NAME, jobUserName);
    initAndStartAppMaster(appMaster, conf, jobUserName);
  } catch (Throwable t) {
    LOG.error("Error starting MRAppMaster", t);
    ExitUtil.terminate(1, t);
  }
}

MRAppMaster

protected static void initAndStartAppMaster(final MRAppMaster appMaster,
    final JobConf conf, String jobUserName) throws IOException,
    InterruptedException {
  UserGroupInformation.setConfiguration(conf);
  // MAPREDUCE-6565: need to set configuration for SecurityUtil.
  SecurityUtil.setConfiguration(conf);
  // Security framework already loaded the tokens into current UGI, just use
  // them
  Credentials credentials =
      UserGroupInformation.getCurrentUser().getCredentials();
  LOG.info("Executing with tokens: {}", credentials.getAllTokens());
  
  UserGroupInformation appMasterUgi = UserGroupInformation
      .createRemoteUser(jobUserName);
  appMasterUgi.addCredentials(credentials);

  // Now remove the AM->RM token so tasks don't have it
  Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
  while (iter.hasNext()) {
    Token<?> token = iter.next();
    if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
      iter.remove();
    }
  }
  conf.getCredentials().addAll(credentials);
  appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
    @Override
    public Object run() throws Exception {
      appMaster.init(conf);
      appMaster.start();
      if(appMaster.errorHappenedShutDown) {
        throw new IOException("Was asked to shut down.");
      }
      return null;
    }
  });
}

AbstractService

public void start() {
  if (isInState(STATE.STARTED)) {
    return;
  }
  //enter the started state
  synchronized (stateChangeLock) {
    if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
      try {
        startTime = System.currentTimeMillis();
        serviceStart();
        if (isInState(STATE.STARTED)) {
          //if the service started (and isn't now in a later state), notify
          LOG.debug("Service {} is started", getName());
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG, this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}

MRAppMaster

public class MRAppMaster extends CompositeService {
  public void start() {
    ...
    job = createJob(getConfig());//创建Job
    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
    jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask,ReduceTask
    startJobs();//启动作业,这是后续一切动作的触发之源
    ...
  }
  protected Job createJob(Configuration conf) {
     Job newJob =
       new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
         taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
         completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
         currentUser.getUserName(), appSubmitTime, amInfos, context);
         ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
     dispatcher.register(JobFinishEvent.Type.class,
        createJobFinishEventHandler());
     return newJob;
 }
}

JobImpl会接收到.JOB_INIT事件,然后触发作业状态从NEW变为INITED,并触发函数InitTransition(),该函数会创建MapTask和 ReduceTask,

public static class InitTransition
      implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
  ...
  createMapTasks(job, inputLength, taskSplitMetaInfo);
  createReduceTasks(job);
  ...
}

其中,createMapTasks函数实现如下:

private void createMapTasks(JobImpl job, long inputLength,
                                TaskSplitMetaInfo[] splits) {
      for (int i=0; i < job.numMapTasks; ++i) {
        TaskImpl task =
            new MapTaskImpl(job.jobId, i,
                job.eventHandler,
                job.remoteJobConfFile,
                job.conf, splits[i],
                job.taskAttemptListener,
                job.committer, job.jobToken, job.fsTokens,
                job.clock, job.completedTasksFromPreviousRun,
                job.applicationAttemptId.getAttemptId(),
                job.metrics, job.appContext);
        job.addTask(task);
      }
    }


作业启动

public class MRAppMaster extends CompositeService {
  protected void startJobs() {
    JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
    dispatcher.getEventHandler().handle(startJobEvent);
  }
}

JobImpl会接收到.JOB_START事件,会触发作业状态从INITED变为RUNNING,并触发函数StartTransition(),进而触发Map Task和Reduce Task开始调:

public static class StartTransition
  implements SingleArcTransition<JobImpl, JobEvent> {
   public void transition(JobImpl job, JobEvent event) {
      job.scheduleTasks(job.mapTasks);
      job.scheduleTasks(job.reduceTasks);
  }
}


这之后,所有Map Task和Reduce Task各自负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,则会创建一个运行实例TaskAttempt,如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例TaskAttempt,直到一个TaskAttempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。