【Hadoop】【Mapreduce】MRAppMaster源码解析
MRAppMaster一个MR作业的第一个Container,其入口在MRAppMaster.main()中。期代码在hadoop-mapreduce-project >>> hadoop-mapreduce-client >>> hadoop-mapreduce-client-app代码如下:
public static void main(String[] args) {
try {
mainStarted = true;
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr = System.getenv(Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(Environment.NM_HOST.name());
String nodePortString = System.getenv(Environment.NM_PORT.name());
String nodeHttpPortString = System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
validateInputParam(containerIdStr, Environment.CONTAINER_ID.name());
validateInputParam(nodeHostString, Environment.NM_HOST.name());
validateInputParam(nodePortString, Environment.NM_PORT.name());
validateInputParam(nodeHttpPortString, Environment.NM_HTTP_PORT.name());
validateInputParam(appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV);
ContainerId containerId = ContainerId.fromString(containerIdStr);
//ContainerId前缀为applicationId,因此可以从containerid解析出applicationid
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
if (applicationAttemptId != null) {
CallerContext.setCurrent(new CallerContext.Builder("mr_appmaster_" + applicationAttemptId.toString()).build());
}
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
MRAppMaster appMaster = new MRAppMaster(
applicationAttemptId,//applicationId
containerId, //containerID
nodeHostString,//节点主机名
Integer.parseInt(nodePortString),//节点端口
Integer.parseInt(nodeHttpPortString), //http端口
appSubmitTime);//app的提交时间
JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));//在类路径中将job.xml加载到内存中;
//ShutdownHookManager就是一个优先级线程管理器
ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster, conf, applicationAttemptId.getApplicationId()), SHUTDOWN_HOOK_PRIORITY);
//初始化MRWebAppUtil,主要获取和Yarn通信的HTTP协议和于JHS通信的http协议。
MRWebAppUtil.initialize(conf);
// log the system properties,打印系统变量
String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
conf.set(MRJobConfig.USER_NAME, jobUserName);
//核心就这一句:初始化并且启动上面创建的MRAppMaster对象。
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.error("Error starting MRAppMaster", t);
ExitUtil.terminate(1, t);
}
}
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final JobConf conf, String jobUserName) throws IOException,
InterruptedException {
UserGroupInformation.setConfiguration(conf);
// MAPREDUCE-6565: need to set configuration for SecurityUtil.
SecurityUtil.setConfiguration(conf);
// Security framework already loaded the tokens into current UGI, just use
// them
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
LOG.info("Executing with tokens: {}", credentials.getAllTokens());
//使用提交job的用户创建一个ugi,并且将当前用户的认证信息Credential给新创建用户的ugi
UserGroupInformation appMasterUgi = UserGroupInformation.createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
// Now remove the AM->RM token so tasks don't have it
// 因为普通的task不应该有AM-RM的token,因此要在Credential信息中将AM-RM的token移除。
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
if(appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null;
}
});
}
protected void serviceStart() throws Exception {
//Uber模式简单地可以理解成JVM重用,该模式是2.x开始引入的;以Uber模式运行MR作业,所有的Map Tasks和Reduce Tasks将会在ApplicationMaster所在的容器(container)中运行,也就是说整个MR作业运行的过程只会启动AM container,因为不需要启动mapper 和 reducer containers,所以AM不需要和远程containers通信,整个过程简单了。
if (job.isUber()) {
MRApps.setupDistributedCacheLocal(getConfig());
this.containerAllocator = new LocalContainerAllocator(
this.clientService, this.context, nmHost, nmPort, nmHttpPort
, containerID);
} else {
this.containerAllocator = new RMContainerAllocator(
this.clientService, this.context, preemptionPolicy);
}
((Service)this.containerAllocator).init(getConfig());
((Service)this.containerAllocator).start();
super.serviceStart();
}
对于非Uber模式(通常都是非Uber模式),containerAllocator类型为RMContainerAllocator,每一个AM对应一个单独的RMContainerAllocator对象。
该类的主要作用就是给ResourceManager发送请求,循环处理ContainerAllocation事件。请求调度器分配container
protected void serviceStart() throws Exception {
this.eventHandlingThread = new Thread() {
@SuppressWarnings("unchecked")
@Override
public void run() {
ContainerAllocatorEvent event;
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
//该service一直从RMContainerAllocator.eventQueue里面取出分配container请求
event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
try {
handleEvent(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " to the ContainreAllocator", t);
// Kill the AM
eventHandler.handle(new JobEvent(getJob().getID(),
JobEventType.INTERNAL_ERROR));
return;
}
}
}
};
this.eventHandlingThread.start();
super.serviceStart();
}
每次container请求触发一次事件
//处理某一个container资源请求,有三种请求:
//1-请求container;
//2-取消container请求;
//3-container失败上报;
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
recalculateReduceSchedule = true;
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
boolean isMap = reqEvent.getAttemptID().getTaskId().getTaskType().
equals(TaskType.MAP);
if (isMap) {
//处理map Container请求
handleMapContainerRequest(reqEvent);
} else {
//处理reduce Container请求
handleReduceContainerRequest(reqEvent);
}
taskManager.addToTaskAttemptBlacklist(reqEvent);
} else if (event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
LOG.info("Processing the event " + event.toString());
TaskAttemptId aId = event.getAttemptID();
boolean removed = scheduledRequests.remove(aId);
if (!removed) {
ContainerId containerId = assignedRequests.get(aId);
if (containerId != null) {
removed = true;
assignedRequests.remove(aId);
containersReleased++;
pendingRelease.add(containerId);
release(containerId);
}
}
if (!removed) {
LOG.error("Could not deallocate container for task attemptId " +
aId);
}
preemptionPolicy.handleCompletedContainer(event.getAttemptID());
} else if (
event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
ContainerFailedEvent fEv = (ContainerFailedEvent) event;
String host = getHost(fEv.getContMgrAddress());
containerFailedOnHost(host);
// propagate failures to preemption policy to discard checkpoints for
// failed tasks
preemptionPolicy.handleFailedContainer(event.getAttemptID());
}
}
我们知道,mapreduce任务中map task和reduce task的请求的资源量可以通过客户端参数分开控制。因此这里对map task container请求和reduce task container请求是分开处理的。
上面的handleEvent中总共处理了是三种container操作请求:
1-请求container;
2-取消container请求;
3-container失败上报;
这里我们仅仅对请求Container中的map task container请求过程详细剖析。
RMContainerAllocator.handleMapContainerRequest()
private void handleMapContainerRequest(ContainerRequestEvent reqEvent) {
assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals(
TaskType.MAP));
//AM向RM注册时候,返回的Response信息中包含了MaxContainerainerCapability信息(RM侧限制了一个Container可以申请的最大资源:CPU和内存)
Resource supportedMaxContainerCapability = getMaxContainerCapability();
JobId jobId = getJob().getID();
//初始化 mapcontainer的资源请求,如果已经初始化,则不用在此初始化。也就是说无论有多少个map tasks,请求的资源规格都是一样的。
if (mapResourceRequest.equals(Resources.none())) {
mapResourceRequest = reqEvent.getCapability();
eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest.getMemorySize())));
LOG.info("mapResourceRequest:" + mapResourceRequest);
}
//如果请求的map container的资源规格大于RM中配置的值,那么这个mapContainer的请求是不会被接受的,并且job会被杀死
boolean mapContainerRequestAccepted = true;
if (mapResourceRequest.getMemorySize() > supportedMaxContainerCapability.getMemorySize() ||
mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability.getVirtualCores()) {
mapContainerRequestAccepted = false;
}
if(mapContainerRequestAccepted) {
// set the resources
reqEvent.getCapability().setMemorySize(mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent); //maps are immediately scheduled
} else {
String diagMsg = "The required MAP capability is more than the " +
"supported max container capability in the cluster. Killing" +
" the Job. mapResourceRequest: " + mapResourceRequest +
" maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
}
}
从上面的handleMapContainerRequest()方法可以看出,会将Container请求事件添加到一个map中。然后是在心跳heartbeat请求的scheduledRequests.assign()方法中尝试给将申请到的资源指派给特定的任务。这里有两个过程,即资源请求和资源指派。
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
//请求Container资源
List<Container> allocatedContainers = getResources();
if (allocatedContainers != null && allocatedContainers.size() > 0) {
long start = System.currentTimeMillis();
//申请到的Container仅仅是一些资源集合,并没有和具体的task关联。这里需要将这些申请到的Container分配给具体的task运行。
scheduledRequests.assign(allocatedContainers);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Time taken to assign=" + (System.currentTimeMillis() - start));
}
}
int completedMaps = getJob().getCompletedMaps();
int completedTasks = completedMaps + getJob().getCompletedReduces();
if ((lastCompletedTasks != completedTasks) || (scheduledRequests.maps.size() > 0)) {
//更新完成的task数量,并进行reduce任务调度
lastCompletedTasks = completedTasks;
recalculateReduceSchedule = true;
}
if (recalculateReduceSchedule) {
//是否需要抢占reduce的资源,因为有可能reduce将申请到的资源全部占据。导致map一直得不到资源运行。卡死。
boolean reducerPreempted = preemptReducesIfNeeded();
if (!reducerPreempted) {
// Only schedule new reducers if no reducer preemption happens for
// this heartbeat
// 优先将分配的资源给reduce任务使用。
scheduleReduces(getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceRequest, reduceResourceRequest, pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
}
recalculateReduceSchedule = false;
}
scheduleStats.updateAndLogIfChanged("After Scheduling: ");
}
从上面的过程中可以看到,将map或者reduce container的请求放到一个map中以及心跳线程读取该map组装成请求发送给RM是两个异步的过程。
1-AM-RM的心跳线程中的makeRemoteRequest()会构造一个资源请求,请求中主要包含了ask和release两部分内容。分别表示要请求的container和要释放的container。ask就是在上面的handleMapContainerRequest()方法和handleReduceContainerRequest()方法中加入ask中的。然后再心跳线程中会读ask。
2-container请求时间处理方法中针对资源请求类型将处理方法分成handleMapContainerRequest()方法和handleReduceContainerRequest()方法,在该方法中将container请求添加到ask中;
下面详细分析下心跳线程中每次如何获取资源信息:
RMContainerAllocator.getResources()
//资源请求AM-RM请求资源
private List<Container> getResources() throws Exception {
applyConcurrentTaskLimits();
//AM当前已经申请的可用资源,第一次心跳的时候资源为零;
Resource headRoom = Resources.clone(getAvailableResources());
AllocateResponse response;
/*
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
* milliseconds before aborting. During this interval, AM will still try
* to contact the RM.
*/
try {
//根据ask和release map来申请和释放资源。
response = makeRemoteRequest();
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
//1-AM重启
} catch (ApplicationAttemptNotFoundException e ) {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),JobEventType.JOB_AM_REBOOT));
throw new RMContainerAllocationException("Resource Manager doesn't recognize AttemptId: " + this.getContext().getApplicationAttemptId(), e);
//2-APP信息可以找见,但是am没有注册;
} catch (ApplicationMasterNotRegisteredException e) {
LOG.info("ApplicationMaster is out of sync with ResourceManager," + " hence resync and send outstanding requests.");
// RM may have restarted, re-register with RM.
lastResponseID = 0;
register();
addOutstandingRequestOnResync();
return null;
//3-资源请求格式错误;将job杀死
} catch (InvalidLabelResourceRequestException e) {
// If Invalid label exception is received means the requested label doesnt
// have access so killing job in this case.
String diagMsg = "Requested node-label-expression is invalid: "
+ StringUtils.stringifyException(e);
LOG.info(diagMsg);
JobId jobId = this.getJob().getID();
eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
throw e;
} catch (Exception e) {
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new RMContainerAllocationException("Could not contact RM after " +
retryInterval + " milliseconds.");
}
// Throw this up to the caller, which may decide to ignore it and
// continue to attempt to contact the RM.
throw e;
}
//获取最新请求的资源
Resource newHeadRoom = getAvailableResources();
List<Container> newContainers = response.getAllocatedContainers();
// Setting NMTokens
if (response.getNMTokens() != null) {
for (NMToken nmToken : response.getNMTokens()) {
NMTokenCache.setNMToken(nmToken.getNodeId().toString(),
nmToken.getToken());
}
}
// Setting AMRMToken
if (response.getAMRMToken() != null) {
updateAMRMToken(response.getAMRMToken());
}
List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
// propagate preemption requests
// 客户端通过参数:yarn.app.mapreduce.am.preemption.policy配置抢占策略。默认有三种:
1-CheckpointAMPreemptionPolicy
2-KillAMPreemptionPolicy
3-NoneAMPreemptionPolicy
response的信息中如果包含了抢占信息,那么AM需要根据抢占策略,释放掉待释放的Container。
final PreemptionMessage preemptReq = response.getPreemptionMessage();
if (preemptReq != null) {
//assignedRequests中存放着该AM已经给task分配的container
preemptionPolicy.preempt(new PreemptionContext(assignedRequests), preemptReq);
}
if (newContainers.size() + finishedContainers.size() > 0 || !headRoom.equals(newHeadRoom)) {
//something changed
recalculateReduceSchedule = true;
if (LOG.isDebugEnabled() && !headRoom.equals(newHeadRoom)) {
LOG.debug("headroom=" + newHeadRoom);
}
}
if (LOG.isDebugEnabled()) {
for (Container cont : newContainers) {
LOG.debug("Received new Container :" + cont);
}
}
//Called on each allocation. Will know about newly blacklisted/added hosts.
//根据rm返回的节点信息(新增和删除节点),计算黑名单列表
computeIgnoreBlacklisting();
//根据nodemanager黑名单的变化,对taskattmpt作调整,如果task运行在不可用的节点上面,那么需要将这个taskattempt杀死
handleUpdatedNodes(response);
//客户端可能在job运行过程当中修改了job的优先级
handleJobPriorityChange(response);
// Handle receiving the timeline collector address and token for this app.
MRAppMaster.RunningAppContext appContext = (MRAppMaster.RunningAppContext)this.getContext();
if (appContext.getTimelineV2Client() != null) {
appContext.getTimelineV2Client().setTimelineCollectorInfo(response.getCollectorInfo());
}
for (ContainerStatus cont : finishedContainers) {
//对于运行完成Container进行资源释放;
processFinishedContainer(cont);
}
//返回Response中返回的新申请的Container
return newContainers;
}
上面介绍了getResources()如何给RM发送资源申请请求,并且如何根据response中的信息作出相应的反馈;最终返回本次请求分配的container列表;
如何将这个申请到Container指派给具体的task去运行,是在assign中完成的;代码如下:
private void assign(List<Container> allocatedContainers) {
Iterator<Container> it = allocatedContainers.iterator();
LOG.info("Got allocated containers " + allocatedContainers.size());
//containersAllocated是RMContainerAllocator的成员变量,每一个AM对应一个RMContainerAllocator对象,即containerAllocated表示当前AM申请的还未指派给taskAttemp的Container数量;
containersAllocated += allocatedContainers.size();
int reducePending = reduces.size();
while (it.hasNext()) {
Container allocated = it.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container " + allocated.getId()
+ " with priority " + allocated.getPriority() + " to NM "
+ allocated.getNodeId());
}
// check if allocated container meets memory requirements
// and whether we have any scheduled tasks that need
// a container to be assigned
boolean isAssignable = true;
Priority priority = allocated.getPriority();
Resource allocatedResource = allocated.getResource();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
//每一个申请的Container都有优先级,表明这个Container上面要运行什么类型的taskAttempt(map还是reduce)
//需要校验申请的Container具有资源和task运行需要的资源是否匹配,如果不匹配说明这个container不能用,需要释放掉。
//调度器过一段时间之后发现这个Container超时了(没有在一定时间之内分配给task),那么就会将其回收。
if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
mapResourceRequest, getSchedulerResourceTypes()) <= 0 || maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a map as either "
+ " container memory less than required " + mapResourceRequest
+ " or no pending map tasks - maps.isEmpty="
+ maps.isEmpty());
isAssignable = false;
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
reduceResourceRequest, getSchedulerResourceTypes()) <= 0
|| (reducePending <= 0)) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
+ " container memory less than required " + reduceResourceRequest
+ " or no pending reduce tasks.");
isAssignable = false;
} else {
reducePending--;
}
} else {
LOG.warn("Container allocated at unwanted priority: " + priority +
". Returning to RM...");
isAssignable = false;
}
if(!isAssignable) {
// release container if we could not assign it
containerNotAssigned(allocated);
it.remove();
continue;
}
// do not assign if allocated container is on a blacklisted host
// 这种情况可能会出现在上次申请的container还没有分配完,但是本次心跳的response信息的节点黑名单中包含了之前分配的container,
// 那么这部分container也是不能用的。
String allocatedHost = allocated.getNodeId().getHost();
if (isNodeBlacklisted(allocatedHost)) {
// we need to request for a new container
// and release the current one
LOG.info("Got allocated container on a blacklisted "
+ " host "+allocatedHost
+". Releasing container " + allocated);
// find the request matching this allocated container
// and replace it with a new one
ContainerRequest toBeReplacedReq = getContainerReqToReplace(allocated);
if (toBeReplacedReq != null) {
LOG.info("Placing a new container request for task attempt "
+ toBeReplacedReq.attemptID);
ContainerRequest newReq = getFilteredContainerRequest(toBeReplacedReq);
decContainerReq(toBeReplacedReq);
//map和reduce中存放了当前已经分配了资源的task。
if (toBeReplacedReq.attemptID.getTaskId().getTaskType() == TaskType.MAP) {
maps.put(newReq.attemptID, newReq);
}
else {
reduces.put(newReq.attemptID, newReq);
}
addContainerReq(newReq);
}
else {
LOG.info("Could not map allocated container to a valid request."
+ " Releasing allocated container " + allocated);
}
// release container if we could not assign it
containerNotAssigned(allocated);
it.remove();
continue;
}
}
//上面的一堆操作并没有真正将container分配给task,而只是对container进行了过滤。剩下的allocatedContainers则是真实可用的container
//assign动作也是在该步骤完成的。整个方法核心就这一句:
assignContainers(allocatedContainers);
// release container if we could not assign it
//没有assign的container直接释放。
it = allocatedContainers.iterator();
while (it.hasNext()) {
Container allocated = it.next();
LOG.info("Releasing unassigned container " + allocated);
containerNotAssigned(allocated);
}
}
private void assignContainers(List<Container> allocatedContainers) {
Iterator<Container> it = allocatedContainers.iterator();
while (it.hasNext()) {
Container allocated = it.next();
//assign failed map和reduce任务进行特殊处理
ContainerRequest assigned = assignWithoutLocality(allocated);
if (assigned != null) {
containerAssigned(allocated, assigned);
it.remove();
}
}
//给failed map 和reduce分配完之后才给map任务分配资源
assignMapsWithLocality(allocatedContainers);
}
这里以分配map任务为例进行说明:
private void assignMapsWithLocality(List<Container> allocatedContainers) {
// 优先先对local需求的task进行分配资源
Iterator<Container> it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert (PRIORITY_MAP.equals(priority) || PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
if (!PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
// "if (maps.containsKey(tId))" below should be almost always true.
// hence this while loop would almost always have O(1) complexity
String host = allocated.getNodeId().getHost();
//mapsHostMapping中存放着要提交在这个container所在节点上的task集合;即list为可以提交到host节点上面的task集合;
//换句话说这个list为可以提交到当前container上面的task集合;
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
while (list != null && list.size() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Host matched to the request list " + host);
}
TaskAttemptId tId = list.removeFirst();
if (maps.containsKey(tId)) {
ContainerRequest assigned = maps.remove(tId);
taskManager.removeFromTaskAttemptBlacklist(tId);
containerAssigned(allocated, assigned);
it.remove();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
eventHandler.handle(jce);
hostLocalAssigned++;
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned based on host match " + host);
}
break;
}
}
}
}
// try to match all rack local
it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert (PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
if (!PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
// "if (maps.containsKey(tId))" below should be almost always true.
// hence this while loop would almost always have O(1) complexity
String host = allocated.getNodeId().getHost();
String rack = RackResolver.resolve(host).getNetworkLocation();
//mapsRackMaping
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
while (list != null && list.size() > 0) {
TaskAttemptId tId = list.removeFirst();
if (maps.containsKey(tId)) {
ContainerRequest assigned = maps.remove(tId);
taskManager.removeFromTaskAttemptBlacklist(tId);
containerAssigned(allocated, assigned);
it.remove();
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(assigned.attemptID.getTaskId()
.getJobId());
jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
eventHandler.handle(jce);
rackLocalAssigned++;
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned based on rack match " + rack);
}
break;
}
}
}
}
// 剩下的map进行分配资源,即不考虑节点和rack,随机分配;
it = allocatedContainers.iterator();
while (it.hasNext() && maps.size() > 0 && canAssignMaps()) {
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert (PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
Iterator<TaskAttemptId> itrMaps = maps.keySet().iterator();
while (itrMaps.hasNext()) {
TaskAttemptId tId = itrMaps.next();
// check blacklisted with tId and allocated container
if (taskManager.isNodeBlacklisted(tId, allocated)) {
continue;
} else {
ContainerRequest assigned = maps.remove(tId);
taskManager.removeFromTaskAttemptBlacklist(tId);
containerAssigned(allocated, assigned);
it.remove();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(
assigned.attemptID.getTaskId().getJobId());
jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
eventHandler.handle(jce);
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned based on * match taskId=" + tId);
}
break;
}
}
}
}
}
上面的方法虽然很长,但是实际上里面都有大量的重复代码。总共分为三段:
1-优先给指定了节点的task分配;
2-再给指定了rack的task分配;
3-最后才给没有指定资源限定的task分配资源;
核心代码其实只有一句:containerAssigned(allocated, assigned);
private void containerAssigned(Container allocated, ContainerRequest assigned) {
// Update resource requests
decContainerReq(assigned);
// send the container-assigned event to task attempt
eventHandler.handle(new TaskAttemptContainerAssignedEvent(assigned.attemptID, allocated, applicationACLs));
assignedRequests.add(allocated, assigned.attemptID);
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned container (" + allocated + ") "
+ " to task " + assigned.attemptID + " on node "
+ allocated.getNodeId().toString());
}
}
TaskAttemptContainerAssignedEvent extends TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType>
在MRAppMaster.serviceInit()方法中注册了TaskAttempEventType事件。因此上面事件的处理方法为:
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
Job job = context.getJob(event.getTaskAttemptID().getTaskId().getJobId());
Task task = job.getTask(event.getTaskAttemptID().getTaskId());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
}
以上就是一个AM初始化,启动,申请Container资源,以及将Container分配给Task的所有过程。
下面将介绍MRAppMaster中一个重要的部分(AM日志归集,即JobHistory):JobHistoryEventHandler。为了避免太多分支穿插导致流程凌乱,因此在上面介绍主流程的过程中并没有介绍这部分内容,但也同样重要。
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
* done-dir. JobHistory implementation is in this package to access package
* private classes.
*/
public class JobHistoryEventHandler extends AbstractService implements EventHandler<JobHistoryEvent>
从继承关系中不难看出,JobHistoryEventHandler既是一个事件处理器又是一个Service。具体的事件通过中央调度器AsynDispatcher触发该事件调度。
任何事件处理器都是需要注册到中央调度器中(AsyncDispatcher)。JobHistoryEventHandler也不例外,在MRAppMaster的serviceInit方法中对该事件处理器进行注册。
在RMContainerAllocator serviceStart的时候会通过一个不断从事件队列中取出这个MRAppmaster发起的资源申请请求的事件处理的中触发了该事件,进而在处理map Containe事件的方法中触发了该JobHistoryEvent事件。代码片段如下:
private void handleMapContainerRequest(ContainerRequestEvent reqEvent) {
assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals(
TaskType.MAP));
//AM向RM注册时候,返回的Response信息中包含了MaxContainerainerCapability信息(RM侧限制了一个Container可以申请的最大资源:CPU和内存)
Resource supportedMaxContainerCapability = getMaxContainerCapability();
JobId jobId = getJob().getID();
//初始化 mapcontainer的资源请求,如果已经初始化,则不用在此初始化。也就是说无论有多少个map tasks,请求的资源规格都是一样的。
if (mapResourceRequest.equals(Resources.none())) {
mapResourceRequest = reqEvent.getCapability();
eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest.getMemorySize())));
LOG.info("mapResourceRequest:" + mapResourceRequest);
}
从MRAppMaster中的事件注册信息中不难看出JobHistoryEvent事件的事件处理器为JobHistoryEventHandler。
JobHistoryEventHandler.handle()
public void handle(JobHistoryEvent event) {
try {
if (isJobCompletionEvent(event.getHistoryEvent())) {
// When the job is complete, flush slower but write faster.
maxUnflushedCompletionEvents = maxUnflushedCompletionEvents * postJobCompletionMultiplier;
}
eventQueue.put(event);
// Process it for ATS (if enabled)
if (handleTimelineEvent) {
atsEventDispatcher.getEventHandler().handle(event);
}
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}
事件处理方法中仅仅将这个事件放在JobHistoryEventHandler的事件队列里面。
JobHistoryEventHandler.serviceStart()方法中启动了一个线程在持续不断的在事件队列中拿出事件对象进行处理:handleEvent()
protected void serviceStart() throws Exception {
if (timelineClient != null) {
timelineClient.start();
} else if (timelineV2Client != null) {
timelineV2Client.start();
}
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
JobHistoryEvent event = null;
while (!stopped && !Thread.currentThread().isInterrupted()) {
// Log the size of the history-event-queue every so often.
if (eventCounter != 0 && eventCounter % 1000 == 0) {
eventCounter = 0;
LOG.info("Size of the JobHistory event queue is " + eventQueue.size());
} else {
eventCounter++;
}
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.info("EventQueue take interrupted. Returning");
return;
}
synchronized (lock) {
boolean isInterrupted = Thread.interrupted();
handleEvent(event);
if (isInterrupted) {
LOG.debug("Event handling interrupted");
Thread.currentThread().interrupt();
}
}
}
}
}, "eventHandlingThread");
eventHandlingThread.start();
if (handleTimelineEvent) {
atsEventDispatcher.start();
}
super.serviceStart();
}
public void handleEvent(JobHistoryEvent event) {
synchronized (lock) {
// If this is JobSubmitted Event, setup the writer
if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
try {
AMStartedEvent amStartedEvent = (AMStartedEvent) event.getHistoryEvent();
setupEventWriter(event.getJobID(), amStartedEvent);
} catch (IOException ioe) {
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
ioe);
throw new YarnRuntimeException(ioe);
}
}
// For all events
// (1) Write it out
// (2) Process it for JobSummary
// (3) Process it for ATS (if enabled)
//job的元数据信息存储在fileMap中
MetaInfo mi = fileMap.get(event.getJobID());
try {
HistoryEvent historyEvent = event.getHistoryEvent();
if (! (historyEvent instanceof NormalizedResourceEvent)) {
mi.writeEvent(historyEvent);
}
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID());
} catch (IOException e) {
LOG.error("Error writing History Event: " + event.getHistoryEvent(),
e);
throw new YarnRuntimeException(e);
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
JobSubmittedEvent jobSubmittedEvent = (JobSubmittedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
mi.getJobIndexInfo().setQueueUser(jobSubmittedEvent.getQueueUser());
}
//initialize the launchTime in the JobIndexInfo of MetaInfo
if(event.getHistoryEvent().getEventType() == EventType.JOB_INITED ){
JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setJobStartTime(jie.getLaunchTime());
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_QUEUE_CHANGED) {
JobQueueChangeEvent jQueueEvent =
(JobQueueChangeEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setQueueName(jQueueEvent.getJobQueueName());
}
// If this is JobFinishedEvent, close the writer and setup the job-index
if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
try {
JobFinishedEvent jFinishedEvent = (JobFinishedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces(jFinishedEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
// In case of JOB_ERROR, only process all the Done files(e.g. job
// summary, job history file etc.) if it is last AM retry.
if (event.getHistoryEvent().getEventType() == EventType.JOB_ERROR) {
try {
JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
if(context.isLastAMRetry())
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
|| event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
try {
JobUnsuccessfulCompletionEvent jucEvent = (JobUnsuccessfulCompletionEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
}
}
上面的方法很长,但是功能很单一,有两个作用:
1-处理jobHistory的AM_STARTED事件
/**
* Create an event writer for the Job represented by the jobID.
* Writes out the job configuration to the log directory.
* This should be the first call to history for a job
*
* @param jobId the jobId.
* @param amStartedEvent
* @throws IOException
*/
stagingDirPath为yarn.app.mapreduce.am.staging-dir配置的目录(/tmp/hadoop-yarn/staging)加上后缀:{username}/.staging;该目录中记录了AM的信息,并非日志
-rw-r--r-- 10 super hadoop 316308 2020-08-17 16:09 /tmp/hadoop-yarn/staging/super/.staging/job_1597477099278_0007/job.jar
-rw-r--r-- 10 super hadoop 18329 2020-08-17 16:09 /tmp/hadoop-yarn/staging/super/.staging/job_1597477099278_0007/job.split
-rw-r--r-- 3 super hadoop 5965 2020-08-17 16:09 /tmp/hadoop-yarn/staging/super/.staging/job_1597477099278_0007/job.splitmetainfo
-rw-r--r-- 3 super hadoop 240304 2020-08-17 16:09 /tmp/hadoop-yarn/staging/super/.staging/job_1597477099278_0007/job.xml
该部分日志为AM的日志记录了某一个作业的划分情况,可以用来恢复任务。即当AM挂掉之后再另一个节点上面重新拉起的时候,恢复的依据就是这个目录。
protected void setupEventWriter(JobId jobId, AMStartedEvent amStartedEvent)
throws IOException {
//stagingDirPath为yarn.app.mapreduce.am.staging-dir配置的目录(/tmp/hadoop-yarn/staging)加上后缀:{username}/.staging;该目录中记录了AM的日志
if (stagingDirPath == null) {
LOG.error("Log Directory is null, returning");
throw new IOException("Missing Log Directory for History");
}
MetaInfo oldFi = fileMap.get(jobId);
Configuration conf = getConfig();
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
//stagingDirPath为AM的日志目录/tmp/hadoop-yarn/staging/{username}/.staging,startCount为AM的启动次数,默认有两次重试机制;
Path historyFile = JobHistoryUtils.getStagingJobHistoryFile(stagingDirPath, jobId, startCount);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
if (user == null) {
throw new IOException("User is null while setting up jobhistory eventwriter");
}
String jobName = context.getJob(jobId).getName();
EventWriter writer = (oldFi == null) ? null : oldFi.writer;
//tmp/hadoop-yarn/staging/{username}/.staging/{job_id}_{startCount}_conf.xml
Path logDirConfPath = JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
if (writer == null) {
try {
writer = createEventWriter(historyFile);
LOG.info("Event Writer setup for JobId: " + jobId + ", File: "
+ historyFile);
} catch (IOException ioe) {
LOG.info("Could not create log file: [" + historyFile + "] + for job "
+ "[" + jobName + "]");
throw ioe;
}
//Write out conf only if the writer isn't already setup.
if (conf != null) {
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
if (logDirConfPath != null) {
Configuration redactedConf = new Configuration(conf);
MRJobConfUtil.redact(redactedConf);
//创建hdfs://{nameservice}/tmp/hadoop-yarn/staging/{username}/.staging/{job_id}_{startCount}_conf.xml文件
try (FSDataOutputStream jobFileOut = stagingDirFS.create(logDirConfPath, true)) {
//将conf信息写入上面创建的jobconf.xml文件中
redactedConf.writeXml(jobFileOut);
} catch (IOException e) {
LOG.info("Failed to write the job configuration file", e);
throw e;
}
}
}
}
String queueName = JobConf.DEFAULT_QUEUE_NAME;
if (conf != null) {
queueName = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME);
}
//将该队列的基本信息放进fileMap中,供后面更新job状态使用
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
user, jobName, jobId, amStartedEvent.getForcedJobStateOnShutDown(),
queueName);
fi.getJobSummary().setJobId(jobId);
fi.getJobSummary().setJobLaunchTime(amStartedEvent.getStartTime());
fi.getJobSummary().setJobSubmitTime(amStartedEvent.getSubmitTime());
fi.getJobIndexInfo().setJobStartTime(amStartedEvent.getStartTime());
fi.getJobIndexInfo().setSubmitTime(amStartedEvent.getSubmitTime());
fileMap.put(jobId, fi);
}
2-用来处理不同类型的Job相关的事件,分别有:
EventType.JOB_SUBMITTED
EventType.JOB_INITED
EventType.JOB_QUEUE_CHANGED
EventType.JOB_FINISHED
EventType.JOB_ERROR
EventType.JOB_FAILED
将job的信息存储在jobhistoryEvent中用来记录Job元数据信息的一个map中。随着事件的触发,不断更新Job信息;但是这些job的信息仅仅是存放在内存中,并没有实时写入hdfs的intermediate目录下面。
只有当job成功或者失败或者被异常终止,总是已经结束,那么对于已经完成的job会通过processDoneFiles(event.getJobID());方法将job的信息上传到hdfs的/mr-history/tmp目录下面;
换句话说,jobhistory的intermediate目录下面存放的其实也是已经完成的job的信息。并不是很多网上博客中介绍的,intermediate中存放job的实时信息。
因此在jobhistoryserver中的定时扫描线程会定时扫描intermediate目录,并且根据该目录的修改时间和上次扫描的修改时间或者扫描时间进行对比,如果这个目录在上次扫描结束之后被修改或者上次修改之后被修改,那么说明有新完成的job加入到intermediate目录中。那么moveToDone线程就会将intermediate目录下面的job信息转移到done目录下面。
protected void processDoneFiles(JobId jobId) throws IOException {
//上面提到过这个map里面存放着历史job的元数据信息(处于各个状态的job信息都有)
final MetaInfo mi = fileMap.get(jobId);
if (mi == null) {
throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
}
if (mi.getHistoryFile() == null) {
LOG.warn("No file for job-history with " + jobId + " found in cache!");
}
if (mi.getConfFile() == null) {
LOG.warn("No file for jobconf with " + jobId + " found in cache!");
}
// Writing out the summary file.
// TODO JH enhancement - reuse this file to store additional indexing info
// like ACLs, etc. JHServer can use HDFS append to build an index file
// with more info than is available via the filename.
Path qualifiedSummaryDoneFile = null;
FSDataOutputStream summaryFileOut = null;
//move summary.xml file to intermediate dir
try {
String doneSummaryFileName = getTempFileName(JobHistoryUtils.getIntermediateSummaryFileName(jobId));
//doneDirPrefixPath实际为intermediate-done配置的目录,为/mr-history/tmp
qualifiedSummaryDoneFile = doneDirFS.makeQualified(new Path(doneDirPrefixPath, doneSummaryFileName));
summaryFileOut = doneDirFS.create(qualifiedSummaryDoneFile, true);
summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
summaryFileOut.close();
doneDirFS.setPermission(qualifiedSummaryDoneFile, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
} catch (IOException e) {
LOG.info("Unable to write out JobSummaryInfo to ["
+ qualifiedSummaryDoneFile + "]", e);
throw e;
}
try {
// Move historyFile to intermediate Folder.
Path qualifiedDoneFile = null;
if (mi.getHistoryFile() != null) {
Path historyFile = mi.getHistoryFile();
Path qualifiedLogFile = stagingDirFS.makeQualified(historyFile);
int jobNameLimit =
getConfig().getInt(JHAdminConfig.MR_HS_JOBNAME_LIMIT,
JHAdminConfig.DEFAULT_MR_HS_JOBNAME_LIMIT);
String doneJobHistoryFileName =
getTempFileName(FileNameIndexUtils.getDoneFileName(mi
.getJobIndexInfo(), jobNameLimit));
qualifiedDoneFile = doneDirFS.makeQualified(new Path(doneDirPrefixPath,
doneJobHistoryFileName));
if(moveToDoneNow(qualifiedLogFile, qualifiedDoneFile)) {
String historyUrl = MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(
getConfig(), context.getApplicationID());
context.setHistoryUrl(historyUrl);
LOG.info("Set historyUrl to " + historyUrl);
}
}
// Move confFile to intermediate Folder
Path qualifiedConfDoneFile = null;
if (mi.getConfFile() != null) {
Path confFile = mi.getConfFile();
Path qualifiedConfFile = stagingDirFS.makeQualified(confFile);
String doneConfFileName = getTempFileName(JobHistoryUtils.getIntermediateConfFileName(jobId));
qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(doneDirPrefixPath, doneConfFileName));
moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
}
//将上面上传到tmp目录下的job的配置文件拷贝到done目录下
moveTmpToDone(qualifiedSummaryDoneFile);
moveTmpToDone(qualifiedConfDoneFile);
moveTmpToDone(qualifiedDoneFile);
} catch (IOException e) {
LOG.error("Error closing writer for JobID: " + jobId);
throw e;
}
}
- 点赞
- 收藏
- 关注作者
评论(0)