【Hadoop】【Yarn】NodeManager的心跳机制

举报
沙漠里的果果酱 发表于 2023/08/07 17:38:18 2023/08/07
【摘要】 【Hadoop】【Yarn】NodeManager的心跳机制

Hadoop中所有的Service都是线程或者一个线程池用来实现一个独立的功能。NodeStatusUpdateImpl也不例外。
NodeManager给ResourceManager上报的心跳就是在这里发的。

这个Service的start方法里面主要做了两件事情:
1-将该NM的container信息以及application的日志归集状态注册到RM;
2-启动Node status updater线程,定期上报心跳给RM;

下面主要看下Nodemanager定时上报心跳的线程StatusUpdaterRunnable线程主要做了什么事情。

NodeManager.serviceInit()
---NodeManager.createNodeStatusUpdater()
------new NodeStatusUpdaterImpl()
---------NodeStatusUpdaterImpl.serviceStart()
------------NodeStatusUpdaterImpl.startStatusUpdater()
---------------new StatusUpdaterRunnable()//这里会启动一个名为Node Status Updaterde的线程来周期性给ResourceManager更新当前NodeManage节点的状态信息;
------------------StatusUpdaterRuunable.run()
---------------------ResourceTrackerPBClientImpl.nodeHeartbeat()
------------------------ResourceTrackerServidce.nodeHeartbeat()

NodeStatusUpdates作为NodeManager中的以一个Service.随着NodeManager的初始化而初始化,随着NodeManager的启动而启动。和所有的Service一样,NodeStatusUpdater也是一个线程,实现如下:

    public void run() {
      int lastHeartbeatID = 0;
      boolean missedHearbeat = false;
      while (!isStopped) {
        // Send heartbeat
        try {
          NodeHeartbeatResponse response = null;
          Set<NodeLabel> nodeLabelsForHeartbeat =
              nodeLabelsHandler.getNodeLabelsForHeartbeat();
          Set<NodeAttribute> nodeAttributesForHeartbeat =
                  nodeAttributesHandler.getNodeAttributesForHeartbeat();
          NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
          NodeHeartbeatRequest request =
              NodeHeartbeatRequest.newInstance(nodeStatus,
                  NodeStatusUpdaterImpl.this.context
                      .getContainerTokenSecretManager().getCurrentKey(),
                  NodeStatusUpdaterImpl.this.context
                      .getNMTokenSecretManager().getCurrentKey(),
                  nodeLabelsForHeartbeat,
                  nodeAttributesForHeartbeat,
                  NodeStatusUpdaterImpl.this.context
                      .getRegisteringCollectors());

          if (logAggregationEnabled) {
            // pull log aggregation status for application running in this NM
            List<LogAggregationReport> logAggregationReports =
                getLogAggregationReportsForApps(context
                    .getLogAggregationStatusForApps());
            if (logAggregationReports != null
                && !logAggregationReports.isEmpty()) {
              request.setLogAggregationReportsForApps(logAggregationReports);
            }
          }

          request.setTokenSequenceNo(
              NodeStatusUpdaterImpl.this.tokenSequenceNo);
              request.setTokenSequenceNo(NodeStatusUpdaterImpl.this.tokenSequenceNo);
      //以上就是一个心跳的所有内容了,大致有下面接种:
      1-节点的nodelabel信息;
      2-节点的其他属性信息;
      3-节点的健康状态信息;
      4-container和nodemanger token管理器当前的master key(都是心跳从rm下发下来的);
      5-以及该节点上面运行的application的日志归集情况;
          response = resourceTracker.nodeHeartbeat(request);
          //get next heartbeat interval from response
          nextHeartBeatInterval = response.getNextHeartBeatInterval();
          updateMasterKeys(response);

          if (!handleShutdownOrResyncCommand(response)) {
            nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
                response);
            nodeAttributesHandler
                .verifyRMHeartbeatResponseForNodeAttributes(response);

            // Explicitly put this method after checking the resync
            // response. We
            // don't want to remove the completed containers before resync
            // because these completed containers will be reported back to RM
            // when NM re-registers with RM.
            // Only remove the cleanedup containers that are acked
            removeOrTrackCompletedContainersFromContext(response
                .getContainersToBeRemovedFromNM());

            // If the last heartbeat was missed, it is possible that the
            // RM saw this one as a duplicate and did not process it.
            // If so, we can fail to notify the RM of these completed containers
            // on the next heartbeat if we clear pendingCompletedContainers.
            // If it wasn't a duplicate, the only impact is we might notify
            // the RM twice, which it can handle.
            if (!missedHearbeat) {
              pendingCompletedContainers.clear();
            } else {
              LOG.info("skipped clearing pending completed containers due to " +
                  "missed heartbeat");
              missedHearbeat = false;
            }

            logAggregationReportForAppsTempList.clear();
            lastHeartbeatID = response.getResponseId();
            List<ContainerId> containersToCleanup = response
                .getContainersToCleanup();
            if (!containersToCleanup.isEmpty()) {
              dispatcher.getEventHandler().handle(
                  new CMgrCompletedContainersEvent(containersToCleanup,
                      CMgrCompletedContainersEvent.Reason
                          .BY_RESOURCEMANAGER));
            }
            List<ApplicationId> appsToCleanup =
                response.getApplicationsToCleanup();
            //Only start tracking for keepAlive on FINISH_APP
            trackAppsForKeepAlive(appsToCleanup);
            if (!appsToCleanup.isEmpty()) {
              dispatcher.getEventHandler().handle(
                  new CMgrCompletedAppsEvent(appsToCleanup,
                      CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
            }
            Map<ApplicationId, ByteBuffer> systemCredentials =
                YarnServerBuilderUtils.convertFromProtoFormat(
                    response.getSystemCredentialsForApps());
            if (systemCredentials != null && !systemCredentials.isEmpty()) {
              ((NMContext) context).setSystemCrendentialsForApps(
                  parseCredentials(systemCredentials));
              context.getContainerManager().handleCredentialUpdate();
            }
            List<org.apache.hadoop.yarn.api.records.Container>
                containersToUpdate = response.getContainersToUpdate();
            if (!containersToUpdate.isEmpty()) {
              dispatcher.getEventHandler().handle(
                  new CMgrUpdateContainersEvent(containersToUpdate));
            }

            // SignalContainer request originally comes from end users via
            // ClientRMProtocol's SignalContainer. Forward the request to
            // ContainerManager which will dispatch the event to
            // ContainerLauncher.
            List<SignalContainerRequest> containersToSignal = response
                .getContainersToSignalList();
            if (!containersToSignal.isEmpty()) {
              dispatcher.getEventHandler().handle(
                  new CMgrSignalContainersEvent(containersToSignal));
            }

            // Update QueuingLimits if ContainerManager supports queuing
            ContainerQueuingLimit queuingLimit =
                response.getContainerQueuingLimit();
            if (queuingLimit != null) {
              context.getContainerManager().updateQueuingLimit(queuingLimit);
            }
          }
          // Handling node resource update case.
          Resource newResource = response.getResource();
          if (newResource != null) {
            updateNMResource(newResource);
            LOG.debug("Node's resource is updated to {}", newResource);
            if (!totalResource.equals(newResource)) {
              LOG.info("Node's resource is updated to {}", newResource);
            }
          }
          if (timelineServiceV2Enabled) {
            updateTimelineCollectorData(response);
          }

          NodeStatusUpdaterImpl.this.tokenSequenceNo =
              response.getTokenSequenceNo();
        } catch (ConnectException e) {
          //catch and throw the exception if tried MAX wait time to connect RM
          dispatcher.getEventHandler().handle(
              new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
          // failed to connect to RM.
          failedToConnect = true;
          throw new YarnRuntimeException(e);
        } catch (Exception e) {

          // TODO Better error handling. Thread can die with the rest of the
          // NM still running.
          LOG.error("Caught exception in status-updater", e);
          missedHearbeat = true;
        } finally {
          synchronized (heartbeatMonitor) {
            nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
                YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
                nextHeartBeatInterval;
            try {
              heartbeatMonitor.wait(nextHeartBeatInterval);
            } catch (InterruptedException e) {
              // Do Nothing
            }
          }
        }
      }
    }

该线程主要做了下面几件事情:

  1. 准备需要心跳发送的内容,主要包含节点的健康状态(节点id,response id, nodemanger上面运行的container的状态信息,存活的application信息,节点的健康状态, 节点资源利用率,节点上运行的container的资源利用率),节点的node label;
  2. 将上面需要心跳上报的数据组织成NodeHeartbeatRequest对象。并发送nodeHeartbeat请求给ResourceManager;
  3. 接下来就是根据ResourceManager的response信息执行相应的动作;

收到response有下面几个动作:      

  1. ResourceManager会根据当前集群以及节点的负载动态调整每一个nodemanager节点的心跳上报周期,默认是1000ms,如果response中有nextHeartBeatInterval信息就会会该nodemanager节点的心跳上报周期进行更新;
  2. Response中还返回了需要nodemanger执行的命令,NodeActiojn总共有三种类型:NORMAL:正常心跳返回, RESYNC:当前nodemanager的resourcemanager已经不同步了,需要重新同步下, SHUTDOWN:停止当前的nodemanager;
  3. SHUTDOWN和RESYNC类型基本不需要做什么,主要是NORMAL场景。校验心跳上报的nodelabel信息resourcemanager有没有接受,这里即便不接受也不会异常只会打印error日志;
  4. 校验上面心跳上报的nodemanager属性信息是不是正常,如果不正常同样也不会异常退出,只会打印error信息;
  5. 将response信息中该nm节点待删除的container进行记录并删除;
  6. 将response信息中待清理的Container进行清理,这里需要注意的是待删除的container和待清理的container是不同的,待删除的container是Nodemanger侧自己感知到的运行完成的container,无论成功与否都将上报给ResourceManager,ResourceManager返回告诉nodemanager:好的,收到了,这些container状态我已经更新了,你可以删除了。但是清理的container则是例如application被杀死等情况下,ResourceManager主动在response中告知nm,这些container已经不需要了,可以清理了l;
  7. 如果response中有待清理的application信息则清理Application;
  8. 解析response中返回的application的token信息,通过nodemanger自己的containermanager更新container的的credential,即token信息;
  9. 更新Container中的调度队列长度限制,默认是没有的。ResourceManager有资源调度器,类似的NodeManager也有Container调度器,其中维护了一个调度队列,超过这个队列的就会拒绝;
  10. 更新NodeManager节点资源情况,主要更新metrics和container监控指标;

上面都NodeManager侧的行为,即RPC请求客户端的行为,那么服务端ResourceManager收到nodemanager的心跳请求之后会发生什么事情呢?

ResourceTrackerService.modeHeartbeat()
---触发RMNodeStatusEvent事件
------该事件最终会被状态机RMNodeImpl捕获;
---------由事件处理器StatusUpdateWhenHealthyTransition()
------------触发NodeUpdateSchedulerEvent事件(类型为NODE_UPDATE)
---------------该事件最终被Yarn调度器进行处理,例如CapacityScheduler(事件处理器在ResourceManager.serviceInit初始化的时候进行注册)
------------------上面的事件处理方法会调用CapacityScheduler.nodeUpdate()
---------------------AbstractYarnScheduler.completedContainer()//这里以completeContainer为例进行说明
------------------------RmContainerImpl.handle()
---------------------------触发状态转移Running-----》completed,状态转移方法为FinishedTransision.transition()
------------------------------触发事件:RMAppAttempContainerFinishedEvent
---------------------------------该事件被状态机RMAppAttemptImpl捕获,并且发生状态迁移(Finishing----->Finished)
------------------------------------状态转移方法为:AMFinishingContainerFinishedTransition.transition()
---------------------------------------如果当前完成的container为AM那么将状态转移为FInished状态;
------------------------------------------new FinalTransition(RMAppTeemptState.FINISHED).transition()
---------------------------------------------BaseFinalTransition.transition()
------------------------------------------------触发RMAppEventType.ATTEMPT_FINISHED事件






【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。