【Hadoop】【Yarn】NodeManager中Container资源隔离

举报
沙漠里的果果酱 发表于 2023/08/10 11:12:46 2023/08/10
【摘要】 ContainerManagerImpl作为nodemanger中的一个Service,主要用来管理container的资源隔离。

ContainerManagerImpl作为nodemanger中的一个Service,主要用来管理container的资源隔离。
例如container资源超过阈值,会将其杀死。
或者本地目录存储数据过大,也会将其杀死。
(1)如果开启虚拟内存监控,则判断进程树虚拟内存使用总量是否超过其上限值;
(2)如果开启物理内存监控,则判断进程树物理内存使用总量是否超过其上限值;
虚拟、物理内存监控选项的开启分别由参数yarn.nodemanager.vmem-check-enabled、yarn.nodemanager.pmem-check-enabled指定,默认值均为true,表示两者均开启监控。
这里需要重提一个yarn的概念,那就是yarn里面内存资源控制是yarn自己实现的,而cpu资源隔离是通过cgroup来实现的。

对应代码模块,内存资源隔离是在ContainerManagerImpl实现的,cgroup是在LinuxContainerExecutor实现的。
本文主要讲yarn的内存资源隔离,准确讲是container内存容量控制。
NodeManager本身就是一个Service,里面有包含了很多小service来实现各种功能模块。这部分都在NodeManager service自己的初始化方法里面定义:
NodeManager.serviceInit

ContainerManagerImpl containerManager = createContainerManager(context, exec, del, nodeStatusUpdater, this.aclsManager, dirsHandler);


NodeManager.createContainerManager()

protected ContainerManagerImpl createContainerManager(Context context,
    ContainerExecutor exec, DeletionService del,
    NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
    LocalDirsHandlerService dirsHandler) {
  return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,metrics, dirsHandler);
}

ContainerManagerImpl.ContainerManagerImpl()
public ContainerManagerImpl(Context context, ContainerExecutor exec,
    DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
    NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
  super(ContainerManagerImpl.class.getName());
  this.context = context;
  this.dirsHandler = dirsHandler;

  // ContainerManager级别的中央调度器
  dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher");
  this.deletionService = deletionContext;
  this.metrics = metrics;

  rsrcLocalizationSrvc = createResourceLocalizationService(exec, deletionContext, context, metrics);
  addService(rsrcLocalizationSrvc);

  containersLauncher = createContainersLauncher(context, exec);
  addService(containersLauncher);

  this.nodeStatusUpdater = nodeStatusUpdater;
  this.containerScheduler = createContainerScheduler(context);
  addService(containerScheduler);

  AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = new AuxiliaryLocalPathHandlerImpl(dirsHandler);
  // Start configurable services
  auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler, this.context, this.deletionService);
  auxiliaryServices.registerServiceListener(this);
  addService(auxiliaryServices);

  // initialize the metrics publisher if the timeline service v.2 is enabled
  // and the system publisher is enabled
  Configuration conf = context.getConf();
  if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
    if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
      LOG.info("YARN system metrics publishing service is enabled");
      nmMetricsPublisher = createNMTimelinePublisher(context);
      context.setNMTimelinePublisher(nmMetricsPublisher);
    }
    this.timelineServiceV2Enabled = true;
  }
  this.containersMonitor = createContainersMonitor(exec);
  addService(this.containersMonitor);

  dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher());
  dispatcher.register(ApplicationEventType.class, createApplicationEventDispatcher());
  dispatcher.register(LocalizationEventType.class, new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, nmMetricsPublisher));
  dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
  dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
  dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
  dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);

  addService(dispatcher);

  ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  this.readLock = lock.readLock();
  this.writeLock = lock.writeLock();
}

这里初始化了5种service:
1-ResourceLocalizationService:资源本地化服务
2-ContainersLauncher:container启动服务
3-ContainerScheduler:container调度服务
4-AuxServices:辅助服务
5-ContainersMonitor:container监控

ContainerManagerImpl.createContainerMonitor()
protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) {
  //yarn.nodemanager.container.monitor.factory.class,默认值为DefaultContainerMonitorFactory,一般都使用默认值。
  Class<?> factoryClass = context.getConf().getClass(YarnConfiguration.NM_CONTAINER_MONITOR_FACTORY_CLASS,
          DefaultContainerMonitorFactory.class, ContainerMonitorFactory.class);

  ContainerMonitorFactory factory = (ContainerMonitorFactory) ReflectionUtils.newInstance(factoryClass, getConfig());
  return factory.getContainerMonitor(exec, dispatcher, this.context);
}

DefaultContainerMonitorFactory.getContainerMonitor()
public ContainersMonitor getContainerMonitor(ContainerExecutor exec,
    AsyncDispatcher dispatcher, Context context) {
  return new ContainersMonitorImpl(exec, dispatcher, context);
}


ContainersMonitorImpl.serviceStart()主要做了两件事情:
protected void serviceStart() throws Exception {
  //yarn.nodemanager.container-monitor.enabled为true,且yarn.nodemanger.container-monitor.interval-ms>0 
  //每隔一段时间监控一次这个nodemanger上面所有的container进程的资源使用情况是否超出上限。
  if (containersMonitorEnabled) {
    this.monitoringThread.start();
  }
  //yarn.nodemanager.container-log-monitor.enable
  //container日志目录size监控
  if (logMonitorEnabled) {
    this.logMonitorThread.start();
  }
  super.serviceStart();
}


每隔一段时间监控一次这个nodemanger上面运行的所有container进程的资源使用情况是不是超过上限。具体判定逻辑为:
读取/proc/<PID>/stat文件获取每一个container进程的基本信息,可以构造出每一个container的进程数,从而得到每一个进程数的虚拟内存,物理内存使用总量。
虽然上面的过程能够获取各个Container进程数的内存(物理内存和虚拟内存)使用量,但是我们不能仅仅根据某一个时刻的进程树中的内存使用量是否超过上限值来决定是不是要杀死一个container,因为紫禁城的内存使用量是有波动的。为了避免container被误杀,Hadoop赋予了每一个进程“年龄”属性,并且规定刚启动进程的年龄为1,MonitoringThread线程没更新一次,各个进程的年龄加一,在此基础上,选择被杀死的container的标准如下:
如果一个container对应的进程树中所有进程(年龄大于0)总内存(物理内存和虚拟内存)使用量超过上限值的两倍;或者所有年龄大于1的进程总内存(物理内存和虚拟内存)使用量超过上限,那么认为该Container使用内存超量,可以被杀死。因此可以看到Yarn里面的资源隔离实际上是内存使用量监控。

ContainerManagerImpl.run()

public void run() {

  while (!stopped && !Thread.currentThread().isInterrupted()) {

    // Temporary structure to calculate the total resource utilization of the containers 
    // 参数分别为:物理内存,虚拟内存,以及cpu使用率
    ResourceUtilization trackedContainersUtilization  = ResourceUtilization.newInstance(0, 0, 0.0f);

    // Now do the monitoring for the trackingContainers
    // Check memory usage and kill any overflowing containers
    long vmemUsageByAllContainers = 0;
    long pmemByAllContainers = 0;
    long cpuUsagePercentPerCoreByAllContainers = 0;
    for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers
        .entrySet()) {
      ContainerId containerId = entry.getKey();
      ProcessTreeInfo ptInfo = entry.getValue();
      try {
        // Initialize uninitialized process trees
        initializeProcessTrees(entry);

        String pId = ptInfo.getPID();

        if (pId == null || !isResourceCalculatorAvailable()) {
          continue; // processTree cannot be tracked
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("Constructing ProcessTree for : PID = " + pId
              + " ContainerId = " + containerId);
        }
        ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
        // Set context, containerId, pId info to pTree
        // Which will used by the implemented class ProcfsBasedProcessTree
        // to read smaps file
        pTree.setReadFileAsUser(
            new SMAPSReadFileAsUserImpl(context, containerId, pId));
        pTree.updateProcessTree();    // update process-tree
        long currentVmemUsage = pTree.getVirtualMemorySize();
        long currentPmemUsage = pTree.getRssMemorySize();

        // if machine has 6 cores and 3 are used,
        // cpuUsagePercentPerCore should be 300%
        float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
        if (cpuUsagePercentPerCore < 0) {
          // CPU usage is not available likely because the container just
          // started. Let us skip this turn and consider this container
          // in the next iteration.
          LOG.info("Skipping monitoring container " + containerId
              + " since CPU usage is not yet available.");
          continue;
        }

        recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage,
                currentPmemUsage, trackedContainersUtilization);

        checkLimit(containerId, pId, pTree, ptInfo,
                currentVmemUsage, currentPmemUsage);

        // Accounting the total memory in usage for all containers
        vmemUsageByAllContainers += currentVmemUsage;
        pmemByAllContainers += currentPmemUsage;
        // Accounting the total cpu usage for all containers
        cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore;

        reportResourceUsage(containerId, currentPmemUsage,
                cpuUsagePercentPerCore);
      } catch (Exception e) {
        // Log the exception and proceed to the next container.
        LOG.warn("Uncaught exception in ContainersMonitorImpl "
            + "while monitoring resource of {}", containerId, e);
      }
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total Resource Usage stats in NM by all containers : "
          + "Virtual Memory= " + vmemUsageByAllContainers
          + ", Physical Memory= " + pmemByAllContainers
          + ", Total CPU usage(% per core)= "
          + cpuUsagePercentPerCoreByAllContainers);
    }

    // Save the aggregated utilization of the containers
    setContainersUtilization(trackedContainersUtilization);

    // Publish the container utilization metrics to node manager
    // metrics system.
    NodeManagerMetrics nmMetrics = context.getNodeManagerMetrics();
    if (nmMetrics != null) {
      nmMetrics.setContainerUsedMemGB(
          trackedContainersUtilization.getPhysicalMemory());
      nmMetrics.setContainerUsedVMemGB(
          trackedContainersUtilization.getVirtualMemory());
      nmMetrics.setContainerCpuUtilization(
          trackedContainersUtilization.getCPU());
    }

    try {
      Thread.sleep(monitoringInterval);
    } catch (InterruptedException e) {
      LOG.warn(ContainersMonitorImpl.class.getName()
          + " is interrupted. Exiting.");
      break;
    }
  }
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。