【Hadoop】【ResourceManager】中的作业Token刷新源码分析
ResourceManager中的Application Token刷新是基于事件的,主要对应在application的三个阶段:
1-作业提交;
2-作业恢复;
3-作业结束;
以作业提交阶段为例,核心调用链如下:
RMAppManager.submitApplication()
—DelegationTokenRenewer.addApplicationAsync()
------DelefationTokenRenewer.processDelegationTokenRenewerEvent()
private void processDelegationTokenRenewerEvent( DelegationTokenRenewerEvent evt) { serviceStateLock.readLock().lock(); try { if (isServiceStarted) { Future<?> future = **renewerService.submit(new DelegationTokenRenewerRunnable(evt));** futures.put(evt, future); } else { pendingEventQueue.add(evt); int qSize = pendingEventQueue.size(); if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of pending " + "DelegationTokenRenewerEvent queue is " + qSize); } } } finally { serviceStateLock.readLock().unlock(); } }
这个事件处理方法中最重要的作用就是创建一个事件刷新线程任务,并且DelegationTokenRenewerRunnable线程任务添加到Future map中
private final Map<DelegationTokenRenewerEvent, Future<?>> futures = new HashMap<>();
DelegationTokenRenewerRunnable线程任务实现如下:
` public void run() {
serviceStateLock.readLock().lock();
try {
if (!isServiceStarted) {
return;
}
} finally {
serviceStateLock.readLock().unlock();
}
if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
(DelegationTokenRenewerAppSubmitEvent) evt;
**handleDTRenewerAppSubmitEvent(appSubmitEvt);**
} else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) {
DelegationTokenRenewerAppRecoverEvent appRecoverEvt =
(DelegationTokenRenewerAppRecoverEvent) evt;
handleDTRenewerAppRecoverEvent(appRecoverEvt);
} else if (evt.getType().equals(
DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
DelegationTokenRenewer.this.handleAppFinishEvent(evt);
}
}`
主要对上面提到的三种事件进行处理:
1-注册事件:DelegationTokenRenewerAppSubmitEvent;
2-恢复事件:DelegationTokenRenewerAppRecoverEvent;
3-完成事件:AppFinishEvent
下面以作业提交以为进行说明:
DelegationTokenRenewer.DelegationTokenRenewerRunnable.handleDTRenewerAppSubmitEvent()
private void handleDTRenewerAppSubmitEvent( DelegationTokenRenewerAppSubmitEvent event) { /* * For applications submitted with delegation tokens we are not submitting * the application to scheduler from RMAppManager. Instead we are doing * it from here. The primary goal is to make token renewal as a part of * application submission asynchronous so that client thread is not * blocked during app submission. */ try { // Setup tokens for renewal **DelegationTokenRenewer.this.handleAppSubmitEvent(event);** rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START)); } catch (Throwable t) { LOG.warn( "Unable to add the application to the delegation token renewer.", t); // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we havne't yet informed the // Scheduler about the existence of the application rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(event.getApplicationId(), RMAppEventType.APP_REJECTED, t.getMessage())); } }
DelegationTokenRenewer.handleAppSubmitEvent()
` private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
throws IOException, InterruptedException {
ApplicationId applicationId = evt.getApplicationId();
Credentials ts = evt.getCredentials();
boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
if (ts == null) {
return; // nothing to add
}
if (LOG.isDebugEnabled()) {
LOG.debug("Registering tokens for renewal for:" +
" appId = " + applicationId);
}
Collection<Token<?>> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
// find tokens for renewal, but don't add timers until we know
// all renewable tokens are valid
// At RM restart it is safe to assume that all the previously added tokens
// are valid
Set<DelegationTokenToRenew> delegationTokenToRenews =
Collections.synchronizedSet(new HashSet<>());
appTokens.put(applicationId, delegationTokenToRenews);
Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
boolean hasHdfsToken = false;
for (Token<?> token : tokens) {
if (token.isManaged()) {
if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
LOG.info(applicationId + " found existing hdfs token " + token);
hasHdfsToken = true;
}
if (skipTokenRenewal(token)) {
continue;
}
DelegationTokenToRenew dttr = allTokens.get(token);
if (dttr == null) {
Configuration tokenConf;
if (evt.tokenConf != null) {
// Override conf with app provided conf - this is required in cases
// where RM does not have the required conf to communicate with
// remote hdfs cluster. The conf is provided by the application
// itself.
tokenConf = evt.tokenConf;
LOG.info("Using app provided token conf for renewal,"
+ " number of configs = " + tokenConf.size());
if (LOG.isDebugEnabled()) {
for (Iterator<Map.Entry<String, String>> itor =
tokenConf.iterator(); itor.hasNext(); ) {
Map.Entry<String, String> entry = itor.next();
LOG.debug("Token conf key is " + entry.getKey()
+ " and value is " + entry.getValue());
}
}
} else {
tokenConf = getConfig();
}
dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
tokenConf, now, shouldCancelAtEnd, evt.getUser());
try {
renewToken(dttr);
} catch (IOException ioe) {
if (ioe instanceof SecretManager.InvalidToken
&& dttr.maxDate < Time.now()
&& evt instanceof DelegationTokenRenewerAppRecoverEvent
&& token.getKind().equals(HDFS_DELEGATION_KIND)) {
LOG.info("Failed to renew hdfs token " + dttr
+ " on recovery as it expired, requesting new hdfs token for "
+ applicationId + ", user=" + evt.getUser(), ioe);
requestNewHdfsDelegationTokenAsProxyUser(
Arrays.asList(applicationId), evt.getUser(),
evt.shouldCancelAtEnd());
continue;
}
throw new IOException("Failed to renew token: " + dttr.token, ioe);
}
}
tokenList.add(dttr);
}
}
if (!tokenList.isEmpty()) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
// renewal.
for (DelegationTokenToRenew dtr : tokenList) {
DelegationTokenToRenew currentDtr =
allTokens.putIfAbsent(dtr.token, dtr);
if (currentDtr != null) {
// another job beat us
currentDtr.referringAppIds.add(applicationId);
delegationTokenToRenews.add(currentDtr);
} else {
delegationTokenToRenews.add(dtr);
setTimerForTokenRenewal(dtr);
}
}
}
if (!hasHdfsToken) {
requestNewHdfsDelegationTokenAsProxyUser(Arrays.asList(applicationId),
evt.getUser(),
shouldCancelAtEnd);
}
}`
作业提交/注册事件对应的token刷新实现中主要做了下面几件事情:
1-从事件传进来的credential中拿到所有的token信息,并将没有指定renewer的token进行过滤,只有指定了renewer的token才会被刷新;也就是说刷新application token这种能力只是ResourceManager提供的一种功能模块,具体要不要用其实是有计算框架自己决定的。
2-为每一个token创建DelegationTokenToRenew对象,并且立即执行一次刷新动作;对于作业恢复的场景,可能hdfs的token会过期,那么需要重新使用事件中的用户进行请求一个新的hdfs token,否则即便作业拉起来也有可能失败;
3-将一个application对应的所有的DelegationTokenRenew对象都放到appTokens map中;
4-将上面创建的DelegationTokenRenew对象添加到定时刷新任务中;在token快要过期的时候进行刷新,直到其达到max date;
5-如果上面没有hdfs token那么会向hdfs请求一个token 并且添加到appTokens map中;
上面反复出现了appTokens map其实就只有一个用处就是作为getDelegationTokens()返回:
public Set<Token<?>> getDelegationTokens() { Set<Token<?>> tokens = new HashSet<Token<?>>(); for (Set<DelegationTokenToRenew> tokenList : appTokens.values()) { for (DelegationTokenToRenew token : tokenList) { tokens.add(token.token); } } return tokens; }
刷新Token到底是做了什么事情:
protected void renewToken(final DelegationTokenToRenew dttr) throws IOException { // need to use doAs so that http can find the kerberos tgt // NOTE: token renewers should be responsible for the correct UGI! try { dttr.expirationDate = UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Long>() { @Override public Long run() throws Exception { return dttr.token.renew(dttr.conf); } }); } catch (InterruptedException e) { throw new IOException(e); } LOG.info("Renewed delegation-token= [" + dttr + "]"); }
可以看到刷新token的逻辑很简单
1-将dttr.expirationDate更新了下。
2-将dttr.token中的生存周期也进行更新;
3-根据token刷新时间的Renewer给相应的服务发送renewer rpc请求;namenode, resourcemanager,timelineserver等服务的内存中以及各自的持久化方式存储token,renewer之后会将其进行更新,这样当客户端携带token信息来认证的时候就不会过期了
- 点赞
- 收藏
- 关注作者
评论(0)