【Hadoop】【ResourceManager】中的作业Token刷新源码分析

举报
沙漠里的果果酱 发表于 2023/07/29 18:29:11 2023/07/29
【摘要】 【Hadoop】【ResourceManager】中的作业Token刷新源码分析

ResourceManager中的Application Token刷新是基于事件的,主要对应在application的三个阶段:
1-作业提交;
2-作业恢复;
3-作业结束;
image.png

以作业提交阶段为例,核心调用链如下:
RMAppManager.submitApplication()
—DelegationTokenRenewer.addApplicationAsync()
------DelefationTokenRenewer.processDelegationTokenRenewerEvent()
private void processDelegationTokenRenewerEvent( DelegationTokenRenewerEvent evt) { serviceStateLock.readLock().lock(); try { if (isServiceStarted) { Future<?> future = **renewerService.submit(new DelegationTokenRenewerRunnable(evt));** futures.put(evt, future); } else { pendingEventQueue.add(evt); int qSize = pendingEventQueue.size(); if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of pending " + "DelegationTokenRenewerEvent queue is " + qSize); } } } finally { serviceStateLock.readLock().unlock(); } }

这个事件处理方法中最重要的作用就是创建一个事件刷新线程任务,并且DelegationTokenRenewerRunnable线程任务添加到Future map中
private final Map<DelegationTokenRenewerEvent, Future<?>> futures = new HashMap<>();
DelegationTokenRenewerRunnable线程任务实现如下:
` public void run() {
serviceStateLock.readLock().lock();
try {
if (!isServiceStarted) {
return;
}
} finally {
serviceStateLock.readLock().unlock();
}

  if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
    DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
        (DelegationTokenRenewerAppSubmitEvent) evt;
    **handleDTRenewerAppSubmitEvent(appSubmitEvt);**
  } else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) {
    DelegationTokenRenewerAppRecoverEvent appRecoverEvt =
        (DelegationTokenRenewerAppRecoverEvent) evt;
    handleDTRenewerAppRecoverEvent(appRecoverEvt);
  } else if (evt.getType().equals(
      DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
    DelegationTokenRenewer.this.handleAppFinishEvent(evt);
  }
}`

主要对上面提到的三种事件进行处理:
1-注册事件:DelegationTokenRenewerAppSubmitEvent;
2-恢复事件:DelegationTokenRenewerAppRecoverEvent;
3-完成事件:AppFinishEvent

下面以作业提交以为进行说明:
DelegationTokenRenewer.DelegationTokenRenewerRunnable.handleDTRenewerAppSubmitEvent()

private void handleDTRenewerAppSubmitEvent( DelegationTokenRenewerAppSubmitEvent event) { /* * For applications submitted with delegation tokens we are not submitting * the application to scheduler from RMAppManager. Instead we are doing * it from here. The primary goal is to make token renewal as a part of * application submission asynchronous so that client thread is not * blocked during app submission. */ try { // Setup tokens for renewal **DelegationTokenRenewer.this.handleAppSubmitEvent(event);** rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START)); } catch (Throwable t) { LOG.warn( "Unable to add the application to the delegation token renewer.", t); // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we havne't yet informed the // Scheduler about the existence of the application rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(event.getApplicationId(), RMAppEventType.APP_REJECTED, t.getMessage())); } }

DelegationTokenRenewer.handleAppSubmitEvent()

` private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
throws IOException, InterruptedException {
ApplicationId applicationId = evt.getApplicationId();
Credentials ts = evt.getCredentials();
boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
if (ts == null) {
return; // nothing to add
}

if (LOG.isDebugEnabled()) {
  LOG.debug("Registering tokens for renewal for:" +
      " appId = " + applicationId);
}

Collection<Token<?>> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();

// find tokens for renewal, but don't add timers until we know
// all renewable tokens are valid
// At RM restart it is safe to assume that all the previously added tokens
// are valid
Set<DelegationTokenToRenew> delegationTokenToRenews =
    Collections.synchronizedSet(new HashSet<>());
appTokens.put(applicationId, delegationTokenToRenews);
Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
boolean hasHdfsToken = false;
for (Token<?> token : tokens) {
  if (token.isManaged()) {
    if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
      LOG.info(applicationId + " found existing hdfs token " + token);
      hasHdfsToken = true;
    }
    if (skipTokenRenewal(token)) {
      continue;
    }

    DelegationTokenToRenew dttr = allTokens.get(token);
    if (dttr == null) {
      Configuration tokenConf;
      if (evt.tokenConf != null) {
        // Override conf with app provided conf - this is required in cases
        // where RM does not have the required conf to communicate with
        // remote hdfs cluster. The conf is provided by the application
        // itself.
        tokenConf = evt.tokenConf;
        LOG.info("Using app provided token conf for renewal,"
            + " number of configs = " + tokenConf.size());
        if (LOG.isDebugEnabled()) {
          for (Iterator<Map.Entry<String, String>> itor =
               tokenConf.iterator(); itor.hasNext(); ) {
            Map.Entry<String, String> entry = itor.next();
            LOG.debug("Token conf key is " + entry.getKey()
                + " and value is " + entry.getValue());
          }
        }
      }  else {
        tokenConf = getConfig();
      }
      dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
          tokenConf, now, shouldCancelAtEnd, evt.getUser());
      try {
        renewToken(dttr);
      } catch (IOException ioe) {
        if (ioe instanceof SecretManager.InvalidToken
            && dttr.maxDate < Time.now()
            && evt instanceof DelegationTokenRenewerAppRecoverEvent
            && token.getKind().equals(HDFS_DELEGATION_KIND)) {
          LOG.info("Failed to renew hdfs token " + dttr
              + " on recovery as it expired, requesting new hdfs token for "
              + applicationId + ", user=" + evt.getUser(), ioe);
          requestNewHdfsDelegationTokenAsProxyUser(
              Arrays.asList(applicationId), evt.getUser(),
              evt.shouldCancelAtEnd());
          continue;
        }
        throw new IOException("Failed to renew token: " + dttr.token, ioe);
      }
    }
    tokenList.add(dttr);
  }
}

if (!tokenList.isEmpty()) {
  // Renewing token and adding it to timer calls are separated purposefully
  // If user provides incorrect token then it should not be added for
  // renewal.
  for (DelegationTokenToRenew dtr : tokenList) {
    DelegationTokenToRenew currentDtr =
        allTokens.putIfAbsent(dtr.token, dtr);
    if (currentDtr != null) {
      // another job beat us
      currentDtr.referringAppIds.add(applicationId);
      delegationTokenToRenews.add(currentDtr);
    } else {
      delegationTokenToRenews.add(dtr);
      setTimerForTokenRenewal(dtr);
    }
  }
}

if (!hasHdfsToken) {
  requestNewHdfsDelegationTokenAsProxyUser(Arrays.asList(applicationId),
      evt.getUser(),
    shouldCancelAtEnd);
}

}`

作业提交/注册事件对应的token刷新实现中主要做了下面几件事情:
1-从事件传进来的credential中拿到所有的token信息,并将没有指定renewer的token进行过滤,只有指定了renewer的token才会被刷新;也就是说刷新application token这种能力只是ResourceManager提供的一种功能模块,具体要不要用其实是有计算框架自己决定的。
2-为每一个token创建DelegationTokenToRenew对象,并且立即执行一次刷新动作;对于作业恢复的场景,可能hdfs的token会过期,那么需要重新使用事件中的用户进行请求一个新的hdfs token,否则即便作业拉起来也有可能失败;
3-将一个application对应的所有的DelegationTokenRenew对象都放到appTokens map中;
4-将上面创建的DelegationTokenRenew对象添加到定时刷新任务中;在token快要过期的时候进行刷新,直到其达到max date;
5-如果上面没有hdfs token那么会向hdfs请求一个token 并且添加到appTokens map中;
上面反复出现了appTokens map其实就只有一个用处就是作为getDelegationTokens()返回:

public Set<Token<?>> getDelegationTokens() { Set<Token<?>> tokens = new HashSet<Token<?>>(); for (Set<DelegationTokenToRenew> tokenList : appTokens.values()) { for (DelegationTokenToRenew token : tokenList) { tokens.add(token.token); } } return tokens; }

刷新Token到底是做了什么事情:
protected void renewToken(final DelegationTokenToRenew dttr) throws IOException { // need to use doAs so that http can find the kerberos tgt // NOTE: token renewers should be responsible for the correct UGI! try { dttr.expirationDate = UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Long>() { @Override public Long run() throws Exception { return dttr.token.renew(dttr.conf); } }); } catch (InterruptedException e) { throw new IOException(e); } LOG.info("Renewed delegation-token= [" + dttr + "]"); }

可以看到刷新token的逻辑很简单
1-将dttr.expirationDate更新了下。
2-将dttr.token中的生存周期也进行更新;
3-根据token刷新时间的Renewer给相应的服务发送renewer rpc请求;namenode, resourcemanager,timelineserver等服务的内存中以及各自的持久化方式存储token,renewer之后会将其进行更新,这样当客户端携带token信息来认证的时候就不会过期了
image.png

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。