【Hadoop】【Yarn】NodeManager启动源码分析

举报
沙漠里的果果酱 发表于 2023/08/10 11:20:19 2023/08/10
【摘要】 【Hadoop】【Yarn】NodeManager启动源码分析

NodeManager.main()
---NodeManager.initAndStartNodeManager()
------NodeManager.init()
------NodeManager.start()

NodeManager作为一个service,自然少不了所有Service生命周期的两个过程:
1-AbstractService.init()---> NodeManager.serviceInit()
2-AbstractService.start()--->NodeManager.serviceStart()

下面详细分析两个过程:
【初始化过程】

protected void serviceInit(Configuration conf) throws Exception {
  UserGroupInformation.setConfiguration(conf);
  1-所有保留恢复功能
  rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
      YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);

  //启动作业恢复功能,主要就是创建LevelDB数据库连接;
  initAndStartRecoveryStore(conf);

  //管理container的token以及nm的token。所有的TokenSecretManager都负责token的生成和校验;
  NMContainerTokenSecretManager containerTokenSecretManager = new NMContainerTokenSecretManager(conf, nmStore);
  NMTokenSecretManagerInNM nmTokenSecretManager = new NMTokenSecretManagerInNM(nmStore);

  //从leveldb中尝试恢复token(只有container恢复特性打开的时候才需要)
  recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
  
  this.aclsManager = new ApplicationACLsManager(conf);
  this.dirsHandler = new LocalDirsHandlerService(metrics);

  //分布式调度能力(是机会调度的基础)
  boolean isDistSchedulingEnabled = conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);

  this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);

  ResourcePluginManager pluginManager = createResourcePluginManager();
  pluginManager.initialize(context);
  ((NMContext)context).setResourcePluginManager(pluginManager);

  //初始化LinuxContainerExecutor,用来启动container
  ContainerExecutor exec = createContainerExecutor(conf);
  exec.init(context);

  //所有的删除线程都被会封装成Task线程,交给DeletionService进行处理和管理;
  DeletionService del = createDeletionService(exec);
  addService(del);

  // NodeManager时间处理器
  this.dispatcher = createNMDispatcher();

  // 健康检查服务
  this.nodeHealthChecker = new NodeHealthCheckerService(dirsHandler);
  addService(nodeHealthChecker);

  ((NMContext)context).setContainerExecutor(exec);
  ((NMContext)context).setDeletionService(del);
 
 
  // 节点状态更新线程
  nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);

  //同步心跳中来自RM的黑名单信息
  if (YarnConfiguration.isNodeBlacklistingEnabled(conf)) {
    BlacklistManager blacklistManager = new BlacklistManager(conf);
    ((NMContext)context).setBlacklistManager(blacklistManager);
    addService(blacklistManager);
  }

  // 动态刷新nodelabel信息并且上报到RM,有两种方式实现1-脚本;2-配置
  nodeLabelsProvider = createNodeLabelsProvider(conf);
  if (nodeLabelsProvider != null) {
    addIfService(nodeLabelsProvider);
    nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
  }

  // 动态刷新节点属性信息并且上报给RM,同样有两种方式
  nodeAttributesProvider = createNodeAttributesProvider(conf);
  if (nodeAttributesProvider != null) {
    addIfService(nodeAttributesProvider);
    nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
  }

  //周期性监控NodeManager节点资源信息,并且将资源信息上报给RM
  nodeResourceMonitor = createNodeResourceMonitor();
  addService(nodeResourceMonitor);
  ((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);

  //底层实现是ContainerManagerImpl是nodemanger里面container的状态机,Yarn里面所有ManagerImpl结尾的都是状态机
  containerManager = createContainerManager(context, exec, del, nodeStatusUpdater,
      this.aclsManager, dirsHandler);
  addService(containerManager);
  ((NMContext) context).setContainerManager(containerManager);

  //缓存application的日志归集状态;
  this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
      context);
  addService(nmLogAggregationStatusTracker);
  ((NMContext)context).setNMLogAggregationStatusTracker(
      this.nmLogAggregationStatusTracker);
  
  //nodemanager web服务器
  WebServer webServer = createWebServer(context, containerManager
      .getContainersMonitor(), this.aclsManager, dirsHandler);
  addService(webServer);
  ((NMContext) context).setWebServer(webServer);

  int maxAllocationsPerAMHeartbeat = conf.getInt(
      YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
      YarnConfiguration.DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
  ((NMContext) context).setQueueableContainerAllocator(new DistributedOpportunisticContainerAllocator(
          context.getContainerTokenSecretManager(),maxAllocationsPerAMHeartbeat));

  //注册两种状态机到事件中央处理器中
  dispatcher.register(ContainerManagerEventType.class, containerManager);
  dispatcher.register(NodeManagerEventType.class, this);
  addService(dispatcher);

  pauseMonitor = new JvmPauseMonitor();
  addService(pauseMonitor);
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

  DefaultMetricsSystem.initialize("NodeManager");

  if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
    this.nmCollectorService = createNMCollectorService(context);
    addService(nmCollectorService);
  }

  // StatusUpdater should be added last so that it get started last 
  // so that we make sure everything is up before registering with RM. 
  addService(nodeStatusUpdater);
  ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
  nmStore.setNodeStatusUpdater(nodeStatusUpdater);

  // Do secure login before calling init for added services.
  doSecureLogin();

  registerMXBean();

  context.getContainerExecutor().start();
  
  //上面只是将各个service添加进来了,这一步才是真正的初始化动作。分别会去执行上述service的serviceInit方法;
  super.serviceInit(conf);
}


【启动过程】

AbstractService.start()
---CompositeService.serviceStart()
protected void serviceStart() throws Exception {
  List<Service> services = getServices();
  if (LOG.isDebugEnabled()) {
    LOG.debug(getName() + ": starting services, size=" + services.size());
  }
  for (Service service : services) {
    // start the service. If this fails that service
    // will be stopped and an exception raised
    service.start();
  }
  super.serviceStart();
}


依次将初始化阶段添加打service全部启动;

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。