【Hadoop】【Yarn】NodeManager中Container资源隔离
ContainerManagerImpl作为nodemanger中的一个Service,主要用来管理container的资源隔离。
例如container资源超过阈值,会将其杀死。
或者本地目录存储数据过大,也会将其杀死。
(1)如果开启虚拟内存监控,则判断进程树虚拟内存使用总量是否超过其上限值;
(2)如果开启物理内存监控,则判断进程树物理内存使用总量是否超过其上限值;
虚拟、物理内存监控选项的开启分别由参数yarn.nodemanager.vmem-check-enabled、yarn.nodemanager.pmem-check-enabled指定,默认值均为true,表示两者均开启监控。
这里需要重提一个yarn的概念,那就是yarn里面内存资源控制是yarn自己实现的,而cpu资源隔离是通过cgroup来实现的。
对应代码模块,内存资源隔离是在ContainerManagerImpl实现的,cgroup是在LinuxContainerExecutor实现的。
本文主要讲yarn的内存资源隔离,准确讲是container内存容量控制。
NodeManager本身就是一个Service,里面有包含了很多小service来实现各种功能模块。这部分都在NodeManager service自己的初始化方法里面定义:
NodeManager.serviceInit
ContainerManagerImpl containerManager = createContainerManager(context, exec, del, nodeStatusUpdater, this.aclsManager, dirsHandler);
NodeManager.createContainerManager()
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,metrics, dirsHandler);
}
ContainerManagerImpl.ContainerManagerImpl()
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
super(ContainerManagerImpl.class.getName());
this.context = context;
this.dirsHandler = dirsHandler;
// ContainerManager级别的中央调度器
dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher");
this.deletionService = deletionContext;
this.metrics = metrics;
rsrcLocalizationSrvc = createResourceLocalizationService(exec, deletionContext, context, metrics);
addService(rsrcLocalizationSrvc);
containersLauncher = createContainersLauncher(context, exec);
addService(containersLauncher);
this.nodeStatusUpdater = nodeStatusUpdater;
this.containerScheduler = createContainerScheduler(context);
addService(containerScheduler);
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = new AuxiliaryLocalPathHandlerImpl(dirsHandler);
// Start configurable services
auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler, this.context, this.deletionService);
auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices);
// initialize the metrics publisher if the timeline service v.2 is enabled
// and the system publisher is enabled
Configuration conf = context.getConf();
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
LOG.info("YARN system metrics publishing service is enabled");
nmMetricsPublisher = createNMTimelinePublisher(context);
context.setNMTimelinePublisher(nmMetricsPublisher);
}
this.timelineServiceV2Enabled = true;
}
this.containersMonitor = createContainersMonitor(exec);
addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class, createApplicationEventDispatcher());
dispatcher.register(LocalizationEventType.class, new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, nmMetricsPublisher));
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);
addService(dispatcher);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
}
这里初始化了5种service:
1-ResourceLocalizationService:资源本地化服务
2-ContainersLauncher:container启动服务
3-ContainerScheduler:container调度服务
4-AuxServices:辅助服务
5-ContainersMonitor:container监控
ContainerManagerImpl.createContainerMonitor()
protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) {
//yarn.nodemanager.container.monitor.factory.class,默认值为DefaultContainerMonitorFactory,一般都使用默认值。
Class<?> factoryClass = context.getConf().getClass(YarnConfiguration.NM_CONTAINER_MONITOR_FACTORY_CLASS,
DefaultContainerMonitorFactory.class, ContainerMonitorFactory.class);
ContainerMonitorFactory factory = (ContainerMonitorFactory) ReflectionUtils.newInstance(factoryClass, getConfig());
return factory.getContainerMonitor(exec, dispatcher, this.context);
}
DefaultContainerMonitorFactory.getContainerMonitor()
public ContainersMonitor getContainerMonitor(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
return new ContainersMonitorImpl(exec, dispatcher, context);
}
ContainersMonitorImpl.serviceStart()主要做了两件事情:
protected void serviceStart() throws Exception {
//yarn.nodemanager.container-monitor.enabled为true,且yarn.nodemanger.container-monitor.interval-ms>0
//每隔一段时间监控一次这个nodemanger上面所有的container进程的资源使用情况是否超出上限。
if (containersMonitorEnabled) {
this.monitoringThread.start();
}
//yarn.nodemanager.container-log-monitor.enable
//container日志目录size监控
if (logMonitorEnabled) {
this.logMonitorThread.start();
}
super.serviceStart();
}
每隔一段时间监控一次这个nodemanger上面运行的所有container进程的资源使用情况是不是超过上限。具体判定逻辑为:
读取/proc/<PID>/stat文件获取每一个container进程的基本信息,可以构造出每一个container的进程数,从而得到每一个进程数的虚拟内存,物理内存使用总量。
虽然上面的过程能够获取各个Container进程数的内存(物理内存和虚拟内存)使用量,但是我们不能仅仅根据某一个时刻的进程树中的内存使用量是否超过上限值来决定是不是要杀死一个container,因为紫禁城的内存使用量是有波动的。为了避免container被误杀,Hadoop赋予了每一个进程“年龄”属性,并且规定刚启动进程的年龄为1,MonitoringThread线程没更新一次,各个进程的年龄加一,在此基础上,选择被杀死的container的标准如下:
如果一个container对应的进程树中所有进程(年龄大于0)总内存(物理内存和虚拟内存)使用量超过上限值的两倍;或者所有年龄大于1的进程总内存(物理内存和虚拟内存)使用量超过上限,那么认为该Container使用内存超量,可以被杀死。因此可以看到Yarn里面的资源隔离实际上是内存使用量监控。
ContainerManagerImpl.run()
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
// Temporary structure to calculate the total resource utilization of the containers
// 参数分别为:物理内存,虚拟内存,以及cpu使用率
ResourceUtilization trackedContainersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
// Now do the monitoring for the trackingContainers
// Check memory usage and kill any overflowing containers
long vmemUsageByAllContainers = 0;
long pmemByAllContainers = 0;
long cpuUsagePercentPerCoreByAllContainers = 0;
for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers
.entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
try {
// Initialize uninitialized process trees
initializeProcessTrees(entry);
String pId = ptInfo.getPID();
if (pId == null || !isResourceCalculatorAvailable()) {
continue; // processTree cannot be tracked
}
if (LOG.isDebugEnabled()) {
LOG.debug("Constructing ProcessTree for : PID = " + pId
+ " ContainerId = " + containerId);
}
ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
// Set context, containerId, pId info to pTree
// Which will used by the implemented class ProcfsBasedProcessTree
// to read smaps file
pTree.setReadFileAsUser(
new SMAPSReadFileAsUserImpl(context, containerId, pId));
pTree.updateProcessTree(); // update process-tree
long currentVmemUsage = pTree.getVirtualMemorySize();
long currentPmemUsage = pTree.getRssMemorySize();
// if machine has 6 cores and 3 are used,
// cpuUsagePercentPerCore should be 300%
float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
if (cpuUsagePercentPerCore < 0) {
// CPU usage is not available likely because the container just
// started. Let us skip this turn and consider this container
// in the next iteration.
LOG.info("Skipping monitoring container " + containerId
+ " since CPU usage is not yet available.");
continue;
}
recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage,
currentPmemUsage, trackedContainersUtilization);
checkLimit(containerId, pId, pTree, ptInfo,
currentVmemUsage, currentPmemUsage);
// Accounting the total memory in usage for all containers
vmemUsageByAllContainers += currentVmemUsage;
pmemByAllContainers += currentPmemUsage;
// Accounting the total cpu usage for all containers
cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore;
reportResourceUsage(containerId, currentPmemUsage,
cpuUsagePercentPerCore);
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainersMonitorImpl "
+ "while monitoring resource of {}", containerId, e);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Total Resource Usage stats in NM by all containers : "
+ "Virtual Memory= " + vmemUsageByAllContainers
+ ", Physical Memory= " + pmemByAllContainers
+ ", Total CPU usage(% per core)= "
+ cpuUsagePercentPerCoreByAllContainers);
}
// Save the aggregated utilization of the containers
setContainersUtilization(trackedContainersUtilization);
// Publish the container utilization metrics to node manager
// metrics system.
NodeManagerMetrics nmMetrics = context.getNodeManagerMetrics();
if (nmMetrics != null) {
nmMetrics.setContainerUsedMemGB(
trackedContainersUtilization.getPhysicalMemory());
nmMetrics.setContainerUsedVMemGB(
trackedContainersUtilization.getVirtualMemory());
nmMetrics.setContainerCpuUtilization(
trackedContainersUtilization.getCPU());
}
try {
Thread.sleep(monitoringInterval);
} catch (InterruptedException e) {
LOG.warn(ContainersMonitorImpl.class.getName()
+ " is interrupted. Exiting.");
break;
}
}
}
- 点赞
- 收藏
- 关注作者
评论(0)