【Free Style】Hadoop-Yarn之Resource Manager源码分析(二)
接上文:
Hadoop-Yarn之Resource Manager源码分析(一)
https://portal.huaweicloud.com/blogs/bc5a914ebfb111e7b8317ca23e93a891
3 YARN RM模块分析
3.1 事件及服务
YARN采用了服务模型,它将各种对象服务化,以便于统一管理(统一启动、关闭等)。YARN中将服务分为两种:单一服务和组合服务,组合服务是将多个单个服务组合在一起。
单一服务需继承AbstractService抽象类,而组合服务需继承CompositeService抽象类。
RM中的对象均被封装为服务和事件处理器,其中封装成服务是为了统一管理,而所有事件处理器处理来自中央总调度器分配的事件,总调度器将各个服务发送出来的事件按照事件类型分配给对应的事件处理器,如此循环,构成了RM的整体架构。
(1)RM中的服务
在RM中,AdminService和ClientRMService为单一服务,而ResourceManager(此处是指ResourceManager类,不是指ResourceManager模块)则为组合服务。
(2)RM中的事件处理器
每个对象均可能触发一定的事件,这些事件被统一传递给AsyncDispatcher,AsyncDispatcher收到事件后将之放到一个事件队列中,异步进行处理,比如,如果是AMLauncherEvent类型的事件,则分配给事件处理器ApplicationMasterLauncher,如果是SchedulerEvent类型事件,则分配给事件处理器ResourceScheduler处理。
RM中有以下8种主要的事件。
事件类型 | 状态机 | 事件处理器 |
RMAppEvent | RMApp | ApplicationEventDispatcher |
RMAppAttemptEvent | RMAppAttempt | ApplicationAttemptEventDispatcher |
RMNodeEvent | RMNode | NodeEventDispatcher |
RMContainerEvent | RMContainer | - |
SchedulerEvent | - | SchedulerEventDispatcher |
AMLauncherEvent | - | ApplicationMasterLauncher |
NodesListManagerEvent | - | NodesListManager |
RMAppManagerEvent | - | RMAppManager |
注意:RMContainer状态机不具有异步事件处理器,都是同步调用进行的。
后面会详细介绍上述事件、状态机及事件处理器。
3.2 RM组件分析
如上图所示RM可以分为以下6大核心组件:
(1)用户交互
YARN分别针对普通用户,管理员和Web提供了三种对外服务,分别对应ClientRMService、AdminService和RMWebService:
ClientRMService
ClientRMService是为普通用户提供的服务,它会处理来自客户端各种RPC请求,比如提交应用程序、终止应用程序,获取应用程序运行状态等。
AdminService
YARN为管理员提供了一套独立的服务接口,以防止大量的普通用户请求使管理员发送的管理命令饿死,管理员可通过这些接口管理集群,比如动态更新节点列表,更新ACL列表,更新队列信息等。
RMWebService
为了更加友好地展示集群资源使用情况和应用程序运行状态等信息,YARN对外提供了一个Web 界面。
(2)NM管理
ResourceTrackerService
处理来自NM的请求,主要包括两种请求:注册和心跳,其中,注册是NM启动时发生的行为,请求包中包含节点ID,可用的资源上限等信息,而心跳是周期性行为,包含各个Container运行状态,运行的Application列表、节点健康状况(可通过一个脚本设置),而ResourceTrackerService则为NM返回待释放的Container列表、Application列表等。
NMLivelinessMonitor
监控NM是否活着,如果一个NodeManager在一定时间(默认为10min)内未汇报心跳信息,则认为它死掉了,会将其从集群中移除。
NodesListManager
维护正常节点和异常节点列表,管理exlude(类似于黑名单)和inlude(类似于白名单)节点列表,这两个列表均是在配置文件中设置的,可以动态加载。
(3)AM管理
ApplicationMasterLauncher
与NM通信,要求它为某个应用程序启动AM。
ApplicationMasterService
处理来自AM的请求,主要包括两种请求:注册和心跳,其中,注册是AM启动时发生的行为,包括请求包中包含所在节点,RPC端口号和tracking URL等信息,而心跳是周期性 行为,包含请求资源的类型描述、待释放的Container列表等,而AMS则为之返回新分配的Container、失败的Container等信息。
AMLivelinessMonitor
监控AM是否活着,如果一个AM在一定时间(默认为10min)内未汇报心跳信息,则认为它死掉了,它上面所有正在运行的Container将被认为死亡,AM本身会被重新分配到另外一个节点上(用户可指定每个AM的尝试次数,默认是1次)执行。ApplicationAttempt是在ApplicationMasterLauncher执行startContainer之后就向AMlivelinessMonitor注册,并非是在AM向AMS注册时进行的。
(4)Application管理
ApplicationACLsManager
管理应用程序访问权限,包含两部分权限:查看和修改,查看主要指查看应用程序基本信息,而修改则主要是修改应用程序优先级、杀死应用程序等。
RMAppManager
管理应用程序的启动和关闭。
ContainerAllocationExpirer
YARN不允许AM获得Container后长时间不对其使用,因为这会降低整个集群的利用率。当AM收到RM新分配的一个Container后,必须在一定的时间(默认为10min)内在对应的NM上启动该Container,否则,RM会回收该Container。
(5)安全管理
ResourceManage自带了非常全面的权限管理机制,主要由ClientToAMSecretManager、ContainerTokenSecretManager、ApplicationTokenSecretManager等模块完成。
(6)资源分配
ResourceScheduler
ResourceScheduler是资源调度器,它按照一定的约束条件(比如队列容量限制等)将集群中的资源分配给各个应用程序。ResourceScheduler是一个插拔式模块,默认是FIFO实现,YARN还提供了Fair Scheduler和Capacity Scheduler两个多租户调度器。
3.3 RM代码结构
根目录:对RM的各类服务的实现,包括RM服务主体,User服务,NM管理服务,APP管理服务,安全管理服务等;
ahs:application history&state保存实现以及相关事件;
amlauncher:AM启动以及关闭的实现,主要与NM进行通信;
blacklist:暂时不明确作用
metrics:暂时不明确作用
monitor:调度监控实现
nodelables:node lable的管理实现,node lable的作用待确定
placement:Application放置决策及执行的实现;
recovery:HA相关实现,主要是用于application的状态存储及恢复;
reservation:预留相关的操作,具体功能待确定
resource:资源管理相关属性,包括动态资源配置(vcpu个数,内存复用),优先级,资源类型,资源权重等定义。
rmapp:application状态机及事件管理
rmcontainer:container状态机及事件管理
rmnode:node状态机及事件管理;
scheduler:调度器实现,包括了fifo scheduler,Capacity scheduler ,fair scheduler等调度策略实现;
security:安全相关实现;
webapp:Resource Manage web界面实现。
3.4 RM状态机分析
3.4.1 状态机工厂及状态机生成
Yarn是一个基于事件的编程框架,整个框架的逻辑都是基于事件驱动对象的状态迁移,因此在Yarn中状态机的实现是整个系统的心脏。下面以RMAppAttempt状态机的实现为例进行分析。如下图是整个状态机的对象关系。RMAppAttempt如上面介绍,是一个Application的一次尝试,如果Application出现错误或者启动失败,则会为该Application重新创建一个RMAppAttempt。同时RMAppAttempt类是一个抽象类,具体的实现类为RMAppAttemptImpl。
在RMAppAttemptImpl类中定义了两个成员变量:stateMachineFactory,stateMachine。这两个成员变量分别由两个模板类StateMachineFactory和StateMachine进行定义,使用模板类的原因是为了可以在所有状态机中使用。
private final StateMachine<RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent> stateMachine; private static final StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent> stateMachineFactory; |
StateMachineFactory是一个状态机工厂,用于生成StateMachine。
接下来我们看一下StateMachineFactory是如何定义的:
private static final StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent> stateMachineFactory = new StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent>(RMAppAttemptState.NEW)
// Transitions from NEW State .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition()) ... // Transitions from SUBMITTED state .addTransition(RMAppAttemptState.SUBMITTED, EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.ATTEMPT_ADDED, new ScheduleTransition()) … // Transitions from SCHEDULED State .addTransition(RMAppAttemptState.SCHEDULED, EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition()) … // Transitions from ALLOCATED_SAVING State .addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.ALLOCATED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition()) … // Transitions from ALLOCATED State .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED, RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition()) … // Transitions from LAUNCHED State .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition()) … // Transitions from RUNNING State .addTransition(RMAppAttemptState.RUNNING, EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED), RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition()) … .installTopology(); |
该定义包含了一个new操作,一系列的addTransition操作以及一个installTopology操作。
首先创建一个statMachineFactory对象,并将状态机设置为RMAppAttemptState.NEW。
public StateMachineFactory(STATE defaultInitialState) { this.transitionsLi s t N ode = null; this.defaultInitialState = defaultInitialState; this.optimized = false; this.stateMachineTable = null; } |
然后调用addTransition,该函数用于创建“状态迁移规则”,但是每一个状态迁移规则都会生成一个新的StateMachineFactory对象,并通过TransitionsLi s t N ode进行连接成为一个Transition链表。如下是addTransition的函数形式:
public StateMachineFactory <OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, STATE postState, EVENTTYPE eventType) { return addTransition(preState, postState, eventType, null); } public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition( STATE preState, STATE postState, Set<EVENTTYPE> eventTypes) { return addTransition(preState, postState, eventTypes, null); } public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition( STATE preState, STATE postState, Set<EVENTTYPE> eventTypes, SingleArcTransition<OPERAND, EVENT> hook) { StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> factory = null; //如果有多个事件可以从同一个状态达到另外一个相同状态,则需要单独添加 for (EVENTTYPE event : eventTypes) { if (factory == null) { factory = addTransition(preState, postState, event, hook); } else { factory = factory.addTransition(preState, postState, event, hook); } } return factory; }
public StateMachineFactory <OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, STATE postState, EVENTTYPE eventType, SingleArcTransition<OPERAND, EVENT> hook){ return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> (this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT> (preState, eventType, new SingleInternalArc(postState, hook))); }
public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, Set<STATE> postStates, EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook){ return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> (this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT> (preState, eventType, new MultipleInternalArc(postStates, hook))); } |
由于字数限制,下一篇接着探讨~~~
- 点赞
- 收藏
- 关注作者
评论(0)