【Hadoop】【JHS】2-JHS服务之JobHistory源码解析

举报
沙漠里的果果酱 发表于 2023/08/09 17:27:06 2023/08/09
【摘要】 【Hadoop】【JHS】2-JHS服务之JobHistory源码解析

JobHistory是JobHistoryServer进程中一个AbstractService。
该Service的核心功能就是将/mr-history/intermediate目录下的job信息转移到done目录下面。
初始化和启动方法如下:

protected void serviceInit(Configuration conf) throws Exception {
  LOG.info("JobHistory Init");
  this.conf = conf;
  this.appID = ApplicationId.newInstance(0, 0);
  this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
      .newRecordInstance(ApplicationAttemptId.class);
  //move线程每隔多长时间多hdfs的intermediate目录将mr-history信息转移到done目录中
  moveThreadInterval = conf.getLong(
      JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
      JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);

  hfAsyncProcessEnabled = conf.getBoolean(
          JHAdminConfig.MR_HISTORY_HF_CACHE_ASYNC_START_ENABLE,
          JHAdminConfig.DEFAULT_MR_HISTORY_HF_CACHE_ASYNC_START_ENABLE);

  LOG.info("History server will get config values to be hidden.");
  cfgHiddenInfo = conf.getConfigstobeHidden("mapreduce");
  //创建一个HistoryFileManager,该对象管理hdfs上面存储的jobhistory文件,同样是一个Service
  hsManager = createHistoryFileManager();
  //初始化HistoryFileManager
  hsManager.init(conf);
  
  initExistingAsyncOrSync();
  //创建一个HistoryStorage对象,提供了用来查询已完成job的接口
  //这里使用了HistoryStorage的子类:CachedHistoryStorage extends HistoryStorage
  storage = createHistoryStorage();

  if (storage instanceof Service) {
    ((Service) storage).init(conf);
  }
  storage.setHistoryFileManager(hsManager);

  super.serviceInit(conf);
}


protected void serviceStart() throws Exception {
  hsManager.start();
  if (storage instanceof Service) {
    ((Service) storage).start();
  }
  //这里创建了另一个调度线程池,里面添加了两个定是线程。
  //1-日志扫描线程:定期将intermediate目录中完成的job信息转移到done目录中;
  //2-清理线程,用于定时将留存时间超期的日志信息删除; 
  scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2,
      new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
          .build());
  //日志扫描线程
  scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
      moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);

  //定时清理线程
  scheduleHistoryCleaner();
  super.serviceStart();
}


上面这个两个重要的线程实现:

private class MoveIntermediateToDoneRunnable implements Runnable {
  @Override
  public void run() {
    try {
      if (isDoneCacheHs) {
        LOG.info("Starting scan to move intermediate done files");
        hsManager.scanIntermediateDirectory();
      }
    } catch (IOException e) {
      LOG.error("Error while scanning intermediate done dir ", e);
    }
  }
}
private class HistoryCleaner implements Runnable {
  public void run() {
    if (isDoneCacheHs){
      LOG.info("History Cleaner started");
      try {
        hsManager.clean();
      } catch(IOException e) {
        LOG.warn("Error trying to clean up ", e);
      }
    }
    LOG.info("History Cleaner complete");
  }
}

从上面JobHistory service的init和start方法中可以看到两个很重要的service,分别为:HistoryFileManager和HistoryStorage
HistoryFileManager:主要用来管理hdfs上面的mr history文件;
HistoryStorage:主要提供了已完成Job的接口;

尤其是还是那个面提到的定时扫描线程和定时清理线程的实现也是使用了HistoryFileManager的接口来操作mr history文件;
HistoryFileManager虽然是一个Service,但是很轻量,初始化方法中仅仅初始化了一些全局变量;没有重写start方法;
该类主要是一些工具类;

有了上面的背景之后,下面分析下如何获取一个job的信息。
JobHistory.getJob()

public Job getJob(JobId jobId) {
  return storage.getFullJob(jobId);
}

CachedHistoryStorage.getFullJob()


public Job getFullJob(JobId jobId) {
  Job retVal = null;
  try {
    retVal = loadedJobCache.getUnchecked(jobId);
  } catch (UncheckedExecutionException e) {
    if (e.getCause() instanceof HSFileRuntimeException) {
      LOG.error(e.getCause().getMessage());
      return null;
    } else {
      throw new YarnRuntimeException(e.getCause());
    }
  }
  return retVal;
}

这里的CachedHistoryStorage就是在HistoryServer中定义的storage对象。
 private LoadingCache<JobId, Job> loadedJobCache = null;
其中loadedJobCache定义如上。
类型为LoadingCache,来自Guava框架。具体LoadingCache的使用方法会在另一篇博客中详细介绍。
简单描述如下:
LoadingCache是Cache的子接口,相比较于Cache,当从LoadingCache中读取一个指定key的记录时,如果该记录不存在,则LoadingCache可以自动执行加载数据到缓存的操作。
与构建Cache类型的对象类似,LoadingCache类型的对象也是通过CacheBuilder进行构建,不同的是,在调用CacheBuilder的build方法时,必须传递一个CacheLoader类型的参数,CacheLoader的load方法需要我们提供实现。当调用LoadingCache的get方法时,如果缓存不存在对应key的记录,则CacheLoader中的load方法会被自动调用从外存加载数据,load方法的返回值会作为key对应的value存储到LoadingCache中,并从get方法返回。

因此,在获取job信息的场景中,如果缓存中存在该job的信息,那么直接返回;如果不存在缓存信息,那么会调用Cache初始化时指定的回调函数(loadJob)从hdfs的intermediate目录下面拉取该job的信息到缓存中,如果在hdfs的intermediate目录中仍然没有找到该job的信息,则报错;
回调函数为:CachedHistoryStorage.loadJob()

private Job loadJob(JobId jobId) throws RuntimeException, IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Looking for Job " + jobId);
  }
  HistoryFileInfo fileInfo;
  //上面提到过HistoryFileManager:主要用来管理hdfs上面的mr history文件;
  fileInfo = hsManager.getFileInfo(jobId);

  if (fileInfo == null) {
    throw new HSFileRuntimeException("Unable to find job " + jobId);
  }
  //如果该job的history文件信息在IN-INTERMEDIATE状态中,那么需要等待该文件被转移到DONE目录中
  fileInfo.waitUntilMoved();

  if (fileInfo.isDeleted()) {
    throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
  } else {
    //根据上面获取的job相关的历史文件(JobHistoryFile)信息解析出Job的信息
    return fileInfo.loadJob();
  }
}


HistoryFileManager.getFileInfo()

public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
  // 先从缓存中尝试获取该Job的信息
  HistoryFileInfo fileInfo = jobListCache.get(jobId);
  if (fileInfo != null) {
    return fileInfo;
  }
  // 缓存中如果没有命中,那么开始在intermediate目录中查找,
  scanIntermediateDirectory();
  fileInfo = jobListCache.get(jobId);
  if (fileInfo != null) {
    return fileInfo;
  }

  // 如果intermediate目录中没有找见,那么就会在done目录下面找。
  fileInfo = scanOldDirsForJob(jobId);
  if (fileInfo != null) {
    return fileInfo;
  }
  return null;
}

这里又冒出了一个HistoryFileManager.jobListCache对象;类型为:
protected JobListCache jobListCache = null;
查看JobListCache类型不难发现,其实就是一个Map.
那么问题来了,CachedHistoryStorage.loadedJobCache和HistoryFileManager.jobListCache虽然实现不一样,但是都是以JobID为key的map或者类map结构用来进行缓存。
而且看代码会发现,他们的缓存区大小相等。都是由参数:mapreduce.jobhistory.joblist.cache.size决定。
到底有什么区别?为什么要有两级缓存?
CachedHistoryStorage.loadedJobCache
该缓存为上层缓存,里面缓存了最近访问的Job信息。如果最近一次访问的job信息在该缓存中不存在,那么会从HistoryFileManager.jobListCache缓存中去拿,如果拿不到就去hdfs上面拿;
HistoryFileManager.jobListCache
该缓存并不是像CachedHistoryStorage.loadedJobCache缓存一样,为了加速用户访问,或者说并不是那么直接的加速用户访问。它里面总是存放了最新完成的job的历史信息。

HistoryFileManager.jobListCache的更新过程如下:
jobHistory服务初始化的时候启动了下面的定时扫描线程:JobHistory.MoveIntermediateToDoneRunnable,定时扫描/mr-history/tmp目录,该目录中为没有完成的job的信息,定时扫描将已经完成的job的信息转移为done目录下面。

private class MoveIntermediateToDoneRunnable implements Runnable {
  @Override
  public void run() {
    try {
      if (isDoneCacheHs) {
        LOG.info("Starting scan to move intermediate done files");
        hsManager.scanIntermediateDirectory();
      }
    } catch (IOException e) {
      LOG.error("Error while scanning intermediate done dir ", e);
    }
  }
}

HistoryFileManager.scanIntermediateDirectory()

void scanIntermediateDirectory() throws IOException {
  // TODO it would be great to limit how often this happens, except in the
  // case where we are looking for a particular job.
  // intermediateDoneDirPath为hdfs上面的/mr-history/tmp目录

  List<FileStatus> userDirList = JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
  LOG.debug("Scanning intermediate dirs");
  for (FileStatus userDir : userDirList) {
    String name = userDir.getPath().getName();
    UserLogDir dir = userDirModificationTimeMap.get(name);
    if(dir == null) {
      dir = new UserLogDir();
      UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
      if(old != null) {
        dir = old;
      }
    }
    dir.scanIfNeeded(userDir);
  }
}


public synchronized void scanIfNeeded(FileStatus fs) {
  long newModTime = fs.getModificationTime();
  boolean alwaysScan = conf.getBoolean(
      JHAdminConfig.MR_HISTORY_ALWAYS_SCAN_USER_DIR,
      JHAdminConfig.DEFAULT_MR_HISTORY_ALWAYS_SCAN_USER_DIR);
  if (alwaysScan || modTime != newModTime
      || (scanTime/1000) == (modTime/1000)
      || (scanTime/1000 + 1) == (modTime/1000)) {
    // reset scanTime before scanning happens
    scanTime = System.currentTimeMillis();
    Path p = fs.getPath();
    try {
      scanIntermediateDirectory(p);
      //If scanning fails, we will scan again.  We assume the failure is
      // temporary.
      modTime = newModTime;
    } catch (IOException e) {
      LOG.error("Error while trying to scan the directory " + p, e);
    }
  } else {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Scan not needed of " + fs.getPath());
    }
    // reset scanTime
    scanTime = System.currentTimeMillis();
  }
}


/**
 * 扫描特定的intermediate路径,并且将扫描的intermediate添加到缓存jobListCache中。
 * 
 * @param absPath
 * @throws IOException
 */
private void scanIntermediateDirectory(final Path absPath) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Scanning intermediate dir " + absPath);
  }
  //这里有两个参数:
  //1-absPath为要扫描的某一个特定job的目录
  //2-intermediateDoneDirFC。为根据/mr-history/tmp构建的FileContext
  List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, intermediateDoneDirFc);
  if (LOG.isDebugEnabled()) {
      LOG.debug("Found " + fileStatusList.size() + " files");
  }
  //遍历某一个job下面所有的文件
  for (FileStatus fs : fileStatusList) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("scanning file: "+ fs.getPath());
    }
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
    //获取job.conf文件名称
    String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
    //获取summary文件名称
    String summaryFileName = JobHistoryUtils.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
    //使用读取上面的文件信息构造一个记录Job基本信息的HistoryFileInfo对象,并将其添加到缓存中
    HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs.getPath().getParent(), confFileName), new Path(fs.getPath()
                              .getParent(), summaryFileName), jobIndexInfo, false);
    //addIfAbsent使用java的putIfAbsent实现。如果对于已经存在的key,那么将会返回老值。
    //在这里意味着如果一个这个job存在,那么意味这个这个job已经被加入到缓存中。那么不需要重复加入
    final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
    //如果缓存中没有这个Job的信息,那么说明这是一个新的job
    if (old == null || old.didMoveFail()) {
      final HistoryFileInfo found = (old == null) ? fileInfo : old;
      //maxHistoryAge:历史清除器运行时会删除早于当前值的Job历史文件。maxHistory表示job结束后留存时间
      long cutoff = System.currentTimeMillis() - maxHistoryAge;
      if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
        try {
          //超过留存时间的job就不用再将其从tmp目录转移到done目录下面了。只有对于没有超过留存时间的job才需要将其转移到done目录下面。
          found.delete();
        } catch (IOException e) {
          LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
        }
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Scheduling move to done of " +found);
        }
        //定时线程里面再启动一个一次性线程,专门用来将该job存tmp目录下面转移到done目录下面。
        moveToDoneExecutor.execute(new Runnable() {
          @Override
          public void run() {
            try {
              found.moveToDone();
            } catch (IOException e) {
              LOG.info("Failed to process fileInfo for job: " + 
                  found.getJobId(), e);
            }
          }
        });
      }
    //如果缓存中有这个Job的信息,那么说明这是一个重复的Job信息,在done目录中将其删除。
    } else if (!old.isMovePending()) {
      //This is a duplicate so just delete it
      if (LOG.isDebugEnabled()) {
        LOG.debug("Duplicate: deleting");
      }
      fileInfo.delete();
    }
  }
}

HistoryFileManager.HistoryFileInfo.moveToDone()

synchronized void moveToDone() throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("moveToDone: " + historyFile);
  }
  if (!isMovePending()) {
    // It was either deleted or is already in done. Either way do nothing
    if (LOG.isDebugEnabled()) {
      LOG.debug("Move no longer pending");
    }
    return;
  }
  try {
    long completeTime = jobIndexInfo.getFinishTime();
    if (completeTime == 0) {
      completeTime = System.currentTimeMillis();
    }
    JobId jobId = jobIndexInfo.getJobId();

    if (historyFile == null) {
      LOG.info("No file for job-history with " + jobId + " found in cache!");
    }

    if (confFile == null) {
      LOG.info("No file for jobConf with " + jobId + " found in cache!");
    }

    if (summaryFile == null || !intermediateDoneDirFc.util().exists(
        summaryFile)) {
      LOG.info("No summary file for job: " + jobId);
    } else {
      String jobSummaryString = getJobSummary(intermediateDoneDirFc,
          summaryFile);
      SUMMARY_LOG.info(jobSummaryString);
      LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
      intermediateDoneDirFc.delete(summaryFile, false);
      summaryFile = null;
    }

    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
    addDirectoryToSerialNumberIndex(targetDir);
    //在hdfs的mr-history/done目录上面创建一个时间戳和jobID为前缀的子路径,用来存放job信息
    makeDoneSubdir(targetDir);
    if (historyFile != null) {
      Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
          .getName()));
      if (!toPath.equals(historyFile)) {
        moveToDoneNow(historyFile, toPath);
        historyFile = toPath;
      }
    }
    if (confFile != null) {
      Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
          .getName()));
      if (!toPath.equals(confFile)) {
        moveToDoneNow(confFile, toPath);
        confFile = toPath;
      }
    }
    //拷贝完成之后要将该Job的HistoryInfo状态设置为IN_DONE,表示该job信息已经被转移到done目录中
    state = HistoryInfoState.IN_DONE;
  } catch (Throwable t) {
    LOG.error("Error while trying to move a job to done", t);
    this.state = HistoryInfoState.MOVE_FAILED;
  } finally {
    notifyAll();
  }
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。