【Free Style】Hadoop-Yarn之Resource Manager源码分析(三)
接上篇:
【Free Style】Hadoop-Yarn之Resource Manager源码分析(二)
https://portal.huaweicloud.com/blogs/4abcd5ebc04811e7b8317ca23e93a891
addTransition的四个参数分别表示:当前状态preState,后续状态postStates,触发事件eventType,状态迁移操作接口hook。其中后续状态postStates可以是一个集合,这种场景下后续状态由接口hook的执行结果决定,此时hook函数的类型为MultipleArcTransition(如ScheduleTransition,AMContainerAllocatedTransition等都是该接口的实现),否则该接口类型为SingleArcTransition(如AttemptStartedTransition,AttemptStoredTransition,AMLaunchedTransition等都是该类型接口的实现)。不过最终都会统一封装成ApplicableSingleOrMultipleTransition类。在addTransition中基于下述StateMachineFactory构造函数创建一个StateMachineFactory对象:
private StateMachineFactory (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) { //初始状态拷贝 this.defaultInitialState = that.defaultInitialState; this.transitionsListN ode = new TransitionsListN ode(t, that.transitionsListN ode); this.optimized = false; this.stateMachineTable = null; } private interface ApplicableTransition <OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> { void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject); }
private class TransitionsListN ode { final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition; final TransitionsListN ode next; TransitionsListN ode(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition, TransitionsListN ode next) { //插入TransitionsListN ode列表首部 this.transition = transition; this.next = next; } }
|
可以发现,新创建的StateMachineFactory对象都会通过TransitionListN ode进行串联,并插入到StateMachineFactory对象list的首部。同时TransitionsListN ode中也保存了状态迁移接口transition。事实上,真正串联的是Transition,而StateMachineFactory对象只有最后一次被创建的对象才会被真正使用(其实此时还不是最后一个,最后一个在下一步installTopology()中生成)。
然后我们再看一下installTopology()函数的作用,该函数也会创建一个StateMachineFactory对象,同时会将其transitionsListN ode指向Transition列表的首部,然后基于该Transition列表生成stateMachineTable,该table是一个hashmap,可以基于当前状态和事件快速查找到对应的状态迁移Transition接口。
private Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable; public StateMachineFactory <OPERAND, STATE, EVENTTYPE, EVENT> installTopology() { return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true); }
private StateMachineFactory (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, boolean optimized) { this.defaultInitialState = that.defaultInitialState; this.transitionsListN ode = that.transitionsListN ode; this.optimized = optimized; makeStateMachineTable(); }
private void makeStateMachineTable() { Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack = new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();
Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();
prototype.put(defaultInitialState, null);
//初始化stateMachineTable stateMachineTable = new EnumMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype); //将所有TransitionListN ode形成一个临时的栈,然后再逐一加入到stateMachineTable for (TransitionsListN ode cursor = transitionsListN ode; cursor != null; cursor = cursor.next) { stack.push(cursor.transition); }
while (!stack.isEmpty()) { stack.pop().apply(this); } }
public void apply (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) { Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = subject.stateMachineTable.get(preState); if (transitionMap == null) { transitionMap = new HashMap<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>(); subject.stateMachineTable.put(preState, transitionMap); } //< eventType, transition >加入到stateMachineTable对应节点中的transitionMap transitionMap.put(eventType, transition); } |
至此,StateMachineFactory对象创建完成。其形成的状态机表stateMachineTable如下所示:
MAP<当前状态, MAP<事件类型, Transition(状态机,新的状态,事件类型,事件)>> |
接下来开始生成StateMachine对象,创建过程指定操作对象(RMAppAttempt对象)。
this.stateMachine = stateMachineFactory.make(this); public StateMachine<STATE, EVENTTYPE, EVENT> make(OPERAND operand) { return new InternalStateMachine(operand, defaultInitialState); } |
3.4.2 事件处理
上述过程将状态机的创建和生成进行了详细的介绍,接下来介绍一个事件被触发之后如何执行状态迁移以及状态迁移接口的。仍然以RMAppAttempt状态机为例,介绍如何从NEW状态转变为SUBMITTED状态的。
RM中包含了一个事件分发器AsyncDispatcher,该事件分发器会基于事件类型将对应的事件分发给各个事件处理器(在3.1节中已经介绍了RM中存在的事件处理器)。在AsyncDispatcher中存在一个事件队列,用于实现事件产生和消费的异步操作。
//in AsyncDispatcher.java public AsyncDispatcher() { this(new LinkedBlockingQueue<Event>()); }
public AsyncDispatcher(BlockingQueue<Event> eventQueue) { super("Dispatcher"); this.eventQueue = eventQueue; this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); }
//将事件加入队列中 public void handle(Event event) { … eventQueue.put(event); … }
//in RMAppImpl.java,创建RMAppAttempt之后触发RMAppStartAttemptEvent(RMAppAttemptEventType.START)事件,将事件加入到eventQueue。 handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), transferStateFromPreviousAttempt));
|
RMAppAttempt状态机对应的事件类型是RMAppAttemptEventType,事件处理器是ApplicationAttemptEventDispatcher,系统启动是需要先调用register函数注册该事件类型及其处理器。当有对应事件达到时,AsyncDispatcher中运行线程会从队列中取出事件(run函数),然后调用dispatcher函数基于事件类型调用对应的事件处理器的handle函数。
//in ResourceManager.java,事件及其处理器注册 protected void serviceInit(Configuration configuration) throws Exception { … rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(rmContext)); … }
//in AsyncDispatcher.java public void register(Class<? extends Enum> eventType, EventHandler handler) { … eventDispatchers.put(eventType, handler); … }
//读取事件并分发 public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { Event event; … event = eventQueue.take(); … if (event != null) { dispatch(event); } } }
protected void dispatch(Event event) {
Class<? extends Enum> type = event.getType().getDeclaringClass(); … //基于事件类型获得事件处理器 EventHandler handler = eventDispatchers.get(type); if(handler != null) { handler.handle(event); } … } |
接下来我们就可以看一下ApplicationAttemptEventDispatcher类的定义,找到对应的handle函数,该函数基于RMAppAttempt ID找到对应的的RMAppAttempt对象,然后执行对应的RMAppAttempt的handle函数。在RMAppAttempt.handle中我们可以发现他执行了这个状态机的doTransition函数了,ok,这就进入到状态机处理环节了,和上一节的内容就对接起来了。
//in ResourceManager.java public void handle(RMAppAttemptEvent event) { ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); ApplicationId appAttemptId = appAttemptID.getApplicationId(); RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId); if (rmApp != null) { RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID); if (rmAppAttempt != null) { try { rmAppAttempt.handle(event); } } } …. }
// in RMAppAttemptImpl.java public void handle(RMAppAttemptEvent event) {
… try { ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); this.stateMachine.doTransition(event.getType(), event); } … } |
接下来我们看一下doTransition函数具体做了什么事情。具体见下面代码中的注释。
RMAppAttemptEventType.START, new AttemptStartedTransition())
public syn c hron ized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException { //按照之前添加的StateMachineFactory的Transition对该事件进行处理,并修改当前状态为目标状态 。currentState = StateMachineFactory.this.doTransition (operand, currentState, eventType, event); return currentState; } //进过层层调用,最后执行还函数,其中hook就是之前addTransition中注册的接口。这个中间是通过状态以及事件从状态机表stateMachineTable中找到对应的Transition的。 public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) { if (hook != null) { hook.transition(operand, event); } return postState; } //状态机创建的时候增加的的状态迁移 addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, |
接下来我们看一下AttemptStartedTransition的实现,其核心就是实现了transition函数,即上述doTransition中调用的hook.transition函数,该函数实现对RMAppAttemptEventType.START事件的处理。该处理函数中又会触发新的事件,继续对Application
进行下一步操作。所以说
Yarn
是一个基于事件的编程框架。
}
//in RMAppAttemptImpl.java public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { boolean transferStateFromPreviousAttempt = false; // 将RMAppAttempt注册到ApplicationMasterService服务中,为后续AM的注册做好准备。appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); if (UserGroupInformation.isSecurityEnabled()) { appAttempt.clientTokenMasterKey = appAttempt.rmContext.getClientToAMTokenSecretManager() .createMasterKey(appAttempt.applicationAttemptId); } // 将ApplicationAttempt加入到scheduler中,并触发新的事件通知scheduler。appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( appAttempt.applicationAttemptId, transferStateFromPreviousAttempt)); |
因字数限制,下篇再探讨~~
- 点赞
- 收藏
- 关注作者
评论(0)