【Hadoop】【Yarn】NodeManager的心跳机制
【摘要】 【Hadoop】【Yarn】NodeManager的心跳机制
Hadoop中所有的Service都是线程或者一个线程池用来实现一个独立的功能。NodeStatusUpdateImpl也不例外。
NodeManager给ResourceManager上报的心跳就是在这里发的。
这个Service的start方法里面主要做了两件事情:
1-将该NM的container信息以及application的日志归集状态注册到RM;
2-启动Node status updater线程,定期上报心跳给RM;
下面主要看下Nodemanager定时上报心跳的线程StatusUpdaterRunnable线程主要做了什么事情。
NodeManager.serviceInit()
---NodeManager.createNodeStatusUpdater()
------new NodeStatusUpdaterImpl()
---------NodeStatusUpdaterImpl.serviceStart()
------------NodeStatusUpdaterImpl.startStatusUpdater()
---------------new StatusUpdaterRunnable()//这里会启动一个名为Node Status Updaterde的线程来周期性给ResourceManager更新当前NodeManage节点的状态信息;
------------------StatusUpdaterRuunable.run()
---------------------ResourceTrackerPBClientImpl.nodeHeartbeat()
------------------------ResourceTrackerServidce.nodeHeartbeat()
NodeStatusUpdates作为NodeManager中的以一个Service.随着NodeManager的初始化而初始化,随着NodeManager的启动而启动。和所有的Service一样,NodeStatusUpdater也是一个线程,实现如下:
public void run() {
int lastHeartbeatID = 0;
boolean missedHearbeat = false;
while (!isStopped) {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
Set<NodeAttribute> nodeAttributesForHeartbeat =
nodeAttributesHandler.getNodeAttributesForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat,
nodeAttributesForHeartbeat,
NodeStatusUpdaterImpl.this.context
.getRegisteringCollectors());
if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
List<LogAggregationReport> logAggregationReports =
getLogAggregationReportsForApps(context
.getLogAggregationStatusForApps());
if (logAggregationReports != null
&& !logAggregationReports.isEmpty()) {
request.setLogAggregationReportsForApps(logAggregationReports);
}
}
request.setTokenSequenceNo(
NodeStatusUpdaterImpl.this.tokenSequenceNo);
request.setTokenSequenceNo(NodeStatusUpdaterImpl.this.tokenSequenceNo);
//以上就是一个心跳的所有内容了,大致有下面接种:
1-节点的nodelabel信息;
2-节点的其他属性信息;
3-节点的健康状态信息;
4-container和nodemanger token管理器当前的master key(都是心跳从rm下发下来的);
5-以及该节点上面运行的application的日志归集情况;
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
if (!handleShutdownOrResyncCommand(response)) {
nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
response);
nodeAttributesHandler
.verifyRMHeartbeatResponseForNodeAttributes(response);
// Explicitly put this method after checking the resync
// response. We
// don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM
// when NM re-registers with RM.
// Only remove the cleanedup containers that are acked
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());
// If the last heartbeat was missed, it is possible that the
// RM saw this one as a duplicate and did not process it.
// If so, we can fail to notify the RM of these completed containers
// on the next heartbeat if we clear pendingCompletedContainers.
// If it wasn't a duplicate, the only impact is we might notify
// the RM twice, which it can handle.
if (!missedHearbeat) {
pendingCompletedContainers.clear();
} else {
LOG.info("skipped clearing pending completed containers due to " +
"missed heartbeat");
missedHearbeat = false;
}
logAggregationReportForAppsTempList.clear();
lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (!containersToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason
.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
Map<ApplicationId, ByteBuffer> systemCredentials =
YarnServerBuilderUtils.convertFromProtoFormat(
response.getSystemCredentialsForApps());
if (systemCredentials != null && !systemCredentials.isEmpty()) {
((NMContext) context).setSystemCrendentialsForApps(
parseCredentials(systemCredentials));
context.getContainerManager().handleCredentialUpdate();
}
List<org.apache.hadoop.yarn.api.records.Container>
containersToUpdate = response.getContainersToUpdate();
if (!containersToUpdate.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrUpdateContainersEvent(containersToUpdate));
}
// SignalContainer request originally comes from end users via
// ClientRMProtocol's SignalContainer. Forward the request to
// ContainerManager which will dispatch the event to
// ContainerLauncher.
List<SignalContainerRequest> containersToSignal = response
.getContainersToSignalList();
if (!containersToSignal.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrSignalContainersEvent(containersToSignal));
}
// Update QueuingLimits if ContainerManager supports queuing
ContainerQueuingLimit queuingLimit =
response.getContainerQueuingLimit();
if (queuingLimit != null) {
context.getContainerManager().updateQueuingLimit(queuingLimit);
}
}
// Handling node resource update case.
Resource newResource = response.getResource();
if (newResource != null) {
updateNMResource(newResource);
LOG.debug("Node's resource is updated to {}", newResource);
if (!totalResource.equals(newResource)) {
LOG.info("Node's resource is updated to {}", newResource);
}
}
if (timelineServiceV2Enabled) {
updateTimelineCollectorData(response);
}
NodeStatusUpdaterImpl.this.tokenSequenceNo =
response.getTokenSequenceNo();
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
// failed to connect to RM.
failedToConnect = true;
throw new YarnRuntimeException(e);
} catch (Exception e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
missedHearbeat = true;
} finally {
synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
nextHeartBeatInterval;
try {
heartbeatMonitor.wait(nextHeartBeatInterval);
} catch (InterruptedException e) {
// Do Nothing
}
}
}
}
}
该线程主要做了下面几件事情:
- 准备需要心跳发送的内容,主要包含节点的健康状态(节点id,response id, nodemanger上面运行的container的状态信息,存活的application信息,节点的健康状态, 节点资源利用率,节点上运行的container的资源利用率),节点的node label;
- 将上面需要心跳上报的数据组织成NodeHeartbeatRequest对象。并发送nodeHeartbeat请求给ResourceManager;
- 接下来就是根据ResourceManager的response信息执行相应的动作;
收到response有下面几个动作:
- ResourceManager会根据当前集群以及节点的负载动态调整每一个nodemanager节点的心跳上报周期,默认是1000ms,如果response中有nextHeartBeatInterval信息就会会该nodemanager节点的心跳上报周期进行更新;
- Response中还返回了需要nodemanger执行的命令,NodeActiojn总共有三种类型:NORMAL:正常心跳返回, RESYNC:当前nodemanager的resourcemanager已经不同步了,需要重新同步下, SHUTDOWN:停止当前的nodemanager;
- SHUTDOWN和RESYNC类型基本不需要做什么,主要是NORMAL场景。校验心跳上报的nodelabel信息resourcemanager有没有接受,这里即便不接受也不会异常只会打印error日志;
- 校验上面心跳上报的nodemanager属性信息是不是正常,如果不正常同样也不会异常退出,只会打印error信息;
- 将response信息中该nm节点待删除的container进行记录并删除;
- 将response信息中待清理的Container进行清理,这里需要注意的是待删除的container和待清理的container是不同的,待删除的container是Nodemanger侧自己感知到的运行完成的container,无论成功与否都将上报给ResourceManager,ResourceManager返回告诉nodemanager:好的,收到了,这些container状态我已经更新了,你可以删除了。但是清理的container则是例如application被杀死等情况下,ResourceManager主动在response中告知nm,这些container已经不需要了,可以清理了l;
- 如果response中有待清理的application信息则清理Application;
- 解析response中返回的application的token信息,通过nodemanger自己的containermanager更新container的的credential,即token信息;
- 更新Container中的调度队列长度限制,默认是没有的。ResourceManager有资源调度器,类似的NodeManager也有Container调度器,其中维护了一个调度队列,超过这个队列的就会拒绝;
- 更新NodeManager节点资源情况,主要更新metrics和container监控指标;
上面都NodeManager侧的行为,即RPC请求客户端的行为,那么服务端ResourceManager收到nodemanager的心跳请求之后会发生什么事情呢?
ResourceTrackerService.modeHeartbeat()
---触发RMNodeStatusEvent事件
------该事件最终会被状态机RMNodeImpl捕获;
---------由事件处理器StatusUpdateWhenHealthyTransition()
------------触发NodeUpdateSchedulerEvent事件(类型为NODE_UPDATE)
---------------该事件最终被Yarn调度器进行处理,例如CapacityScheduler(事件处理器在ResourceManager.serviceInit初始化的时候进行注册)
------------------上面的事件处理方法会调用CapacityScheduler.nodeUpdate()
---------------------AbstractYarnScheduler.completedContainer()//这里以completeContainer为例进行说明
------------------------RmContainerImpl.handle()
---------------------------触发状态转移Running-----》completed,状态转移方法为FinishedTransision.transition()
------------------------------触发事件:RMAppAttempContainerFinishedEvent
---------------------------------该事件被状态机RMAppAttemptImpl捕获,并且发生状态迁移(Finishing----->Finished)
------------------------------------状态转移方法为:AMFinishingContainerFinishedTransition.transition()
---------------------------------------如果当前完成的container为AM那么将状态转移为FInished状态;
------------------------------------------new FinalTransition(RMAppTeemptState.FINISHED).transition()
---------------------------------------------BaseFinalTransition.transition()
------------------------------------------------触发RMAppEventType.ATTEMPT_FINISHED事件
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)