【Free Style】Hadoop-Yarn之Resource Manager源码分析(三)

举报
pappy 发表于 2017/11/03 16:44:57 2017/11/03
【摘要】 接上篇:【Free Style】Hadoop-Yarn之Resource Manager源码分析(二)https://portal.huaweicloud.com/blogs/4abcd5ebc04811e7b8317ca23e93a891 addTransition的四个参数分别表示:当前状态preState,后续状态postStates,触发事件eventType,状态迁移操作接口hook。其

接上篇:


【Free Style】Hadoop-Yarn之Resource Manager源码分析(二)


https://portal.huaweicloud.com/blogs/4abcd5ebc04811e7b8317ca23e93a891



addTransition的四个参数分别表示:当前状态preState,后续状态postStates,触发事件eventType,状态迁移操作接口hook。其中后续状态postStates可以是一个集合,这种场景下后续状态由接口hook的执行结果决定,此时hook函数的类型为MultipleArcTransition(如ScheduleTransitionAMContainerAllocatedTransition等都是该接口的实现),否则该接口类型为SingleArcTransition(如AttemptStartedTransitionAttemptStoredTransitionAMLaunchedTransition等都是该类型接口的实现)。不过最终都会统一封装成ApplicableSingleOrMultipleTransition类。在addTransition中基于下述StateMachineFactory构造函数创建一个StateMachineFactory对象:


private StateMachineFactory (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,

     ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) {

  //初始状态拷贝

  this.defaultInitialState = that.defaultInitialState;

  this.transitionsListN ode = new TransitionsListN ode(t, that.transitionsListN ode);

  this.optimized = false;

  this.stateMachineTable = null;

}

private interface ApplicableTransition

           <OPERAND, STATE extends Enum<STATE>,

            EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {

  void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);

}

 

private class TransitionsListN ode {

  final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;

  final TransitionsListN ode next;

   TransitionsListN ode(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition,

      TransitionsListN ode next) {

    //插入TransitionsListN ode列表首部

    this.transition = transition;

    this.next = next;

  }

}

 


可以发现,新创建的StateMachineFactory对象都会通过TransitionListN ode进行串联,并插入到StateMachineFactory对象list的首部。同时TransitionsListN ode中也保存了状态迁移接口transition事实上,真正串联的是Transition,而StateMachineFactory对象只有最后一次被创建的对象才会被真正使用(其实此时还不是最后一个,最后一个在下一步installTopology()中生成)

然后我们再看一下installTopology()函数的作用,该函数也会创建一个StateMachineFactory对象,同时会将其transitionsListN ode指向Transition列表的首部,然后基于该Transition列表生成stateMachineTable,该table是一个hashmap,可以基于当前状态和事件快速查找到对应的状态迁移Transition接口。


private Map<STATE, Map<EVENTTYPE,

Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable;

public StateMachineFactory <OPERAND, STATE, EVENTTYPE, EVENT> installTopology() {

    return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);

  }

 

private StateMachineFactory

    (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,

     boolean optimized) {

  this.defaultInitialState = that.defaultInitialState;

  this.transitionsListN ode = that.transitionsListN ode;

  this.optimized = optimized;

  makeStateMachineTable();

}

 

private void makeStateMachineTable() {

  Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =

    new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();

 

  Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>

    prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();

 

  prototype.put(defaultInitialState, null);

 

//初始化stateMachineTable

  stateMachineTable = new EnumMap<STATE, Map<EVENTTYPE,

                         Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);

  //将所有TransitionListN ode形成一个临时的栈,然后再逐一加入到stateMachineTable

  for (TransitionsListN ode cursor = transitionsListN ode; cursor != null; cursor = cursor.next) {

    stack.push(cursor.transition);

  }

 

  while (!stack.isEmpty()) {

    stack.pop().apply(this);

  }

}

 

public void apply (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {

  Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap

    = subject.stateMachineTable.get(preState);

  if (transitionMap == null) {

    transitionMap = new HashMap<EVENTTYPE,

      Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();

    subject.stateMachineTable.put(preState, transitionMap);

  }

//< eventType, transition >加入到stateMachineTable对应节点中的transitionMap

  transitionMap.put(eventType, transition);

}


至此,StateMachineFactory对象创建完成。其形成的状态机表stateMachineTable如下所示:


MAP<当前状态, MAP<事件类型, Transition(状态机,新的状态,事件类型,事件)>>


 

接下来开始生成StateMachine对象,创建过程指定操作对象(RMAppAttempt对象)。


this.stateMachine = stateMachineFactory.make(this);

public StateMachine<STATE, EVENTTYPE, EVENT> make(OPERAND operand) {

   return new InternalStateMachine(operand, defaultInitialState);

}


3.4.2        事件处理

上述过程将状态机的创建和生成进行了详细的介绍,接下来介绍一个事件被触发之后如何执行状态迁移以及状态迁移接口的。仍然以RMAppAttempt状态机为例,介绍如何从NEW状态转变为SUBMITTED状态的。

RM中包含了一个事件分发器AsyncDispatcher,该事件分发器会基于事件类型将对应的事件分发给各个事件处理器(在3.1节中已经介绍了RM中存在的事件处理器)。在AsyncDispatcher中存在一个事件队列,用于实现事件产生和消费的异步操作。


//in AsyncDispatcher.java

  public AsyncDispatcher() {

    this(new LinkedBlockingQueue<Event>());

  }

 

  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {

    super("Dispatcher");

    this.eventQueue = eventQueue;

    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();

  }

 

//将事件加入队列中

    public void handle(Event event) {

     

        eventQueue.put(event);

   

}

 

 

//in RMAppImpl.java,创建RMAppAttempt之后触发RMAppStartAttemptEvent(RMAppAttemptEventType.START)事件,将事件加入到eventQueue。

handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),

      transferStateFromPreviousAttempt));

 


 

RMAppAttempt状态机对应的事件类型是RMAppAttemptEventType,事件处理器是ApplicationAttemptEventDispatcher,系统启动是需要先调用register函数注册该事件类型及其处理器。当有对应事件达到时,AsyncDispatcher中运行线程会从队列中取出事件(run函数),然后调用dispatcher函数基于事件类型调用对应的事件处理器的handle函数。


//in ResourceManager.java,事件及其处理器注册

protected void serviceInit(Configuration configuration) throws Exception {    

rmDispatcher.register(RMAppAttemptEventType.class,

          new ApplicationAttemptEventDispatcher(rmContext));

}

 

//in AsyncDispatcher.java

  public void register(Class<? extends Enum> eventType, EventHandler handler) {

      eventDispatchers.put(eventType, handler);

  }

 

//读取事件并分发

public void run() {

while (!stopped && !Thread.currentThread().isInterrupted()) {

    Event event;

 

    event = eventQueue.take();

    if (event != null) {

      dispatch(event);

    }

}

 }

 

 

  protected void dispatch(Event event) {

 

    Class<? extends Enum> type = event.getType().getDeclaringClass();

  //基于事件类型获得事件处理器

      EventHandler handler = eventDispatchers.get(type);

      if(handler != null) {

        handler.handle(event);

      }

    

}


接下来我们就可以看一下ApplicationAttemptEventDispatcher类的定义,找到对应的handle函数,该函数基于RMAppAttempt ID找到对应的的RMAppAttempt对象,然后执行对应的RMAppAttempthandle函数。在RMAppAttempt.handle中我们可以发现他执行了这个状态机的doTransition函数了,ok,这就进入到状态机处理环节了,和上一节的内容就对接起来了。


//in ResourceManager.java

    public void handle(RMAppAttemptEvent event) {

      ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();

      ApplicationId appAttemptId = appAttemptID.getApplicationId();

      RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId);

      if (rmApp != null) {

        RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID);

        if (rmAppAttempt != null) {

          try {

            rmAppAttempt.handle(event);

          }

        }

      }

      ….

}

 

// in RMAppAttemptImpl.java

  public void handle(RMAppAttemptEvent event) {

 

    

    try {

      ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();

        this.stateMachine.doTransition(event.getType(), event);

      }

     

  }


接下来我们看一下doTransition函数具体做了什么事情。具体见下面代码中的注释。


          RMAppAttemptEventType.START, new AttemptStartedTransition())

public syn

c hron ized STATE doTransition(EVENTTYPE eventType, EVENT event)

throws InvalidStateTransitionException  {

//按照之前添加的StateMachineFactory的Transition对该事件进行处理,并修改当前状态为目标状态

  

currentState = StateMachineFactory.this.doTransition

    

(operand, currentState, eventType, event);

  

return currentState;

 

}

//进过层层调用,最后执行还函数,其中hook就是之前addTransition中注册的接口。这个中间是通过状态以及事件从状态机表stateMachineTable中找到对应的Transition的。

public STATE doTransition(OPERAND operand, STATE oldState,

                         

EVENT event, EVENTTYPE eventType) {

if (hook != null) {

    

hook.transition(operand, event);

  

}

  

return postState;


}

 

//状态机创建的时候增加的的状态迁移

addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,


接下来我们看一下AttemptStartedTransition的实现,其核心就是实现了transition函数,即上述doTransition中调用的hook.transition函数,该函数实现对RMAppAttemptEventType.START事件的处理。该处理函数中又会触发新的事件,继续对Application

进行下一步操作。所以说


Yarn


是一个基于事件的编程框架。


}

//in RMAppAttemptImpl.java

public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {


           

boolean transferStateFromPreviousAttempt = false; 

  //

将RMAppAttempt注册到ApplicationMasterService服务中,为后续AM的注册做好准备。

  

appAttempt.masterService


    

.registerAppAttempt(appAttempt.applicationAttemptId);

   

if (UserGroupInformation.isSecurityEnabled()) {

    

appAttempt.clientTokenMasterKey =

      

appAttempt.rmContext.getClientToAMTokenSecretManager()

        

.createMasterKey(appAttempt.applicationAttemptId);


    

} 

  // 

将ApplicationAttempt加入到scheduler中,并触发新的事件通知scheduler。

  

appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(

    

appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));



 因字数限制,下篇再探讨~~



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。