Hive虚拟列的生成与计算【3】

举报
想要一只猫 发表于 2021/09/08 00:17:13 2021/09/08
【摘要】 本文主要解析介绍Hive虚拟列INPUT__FILE__NAME相关源码

介绍

        前两篇文章分别介绍了虚拟列的生成时机和虚拟列生成依赖的一些对象。接下来的系列文章将介绍个各个虚拟列的具体生成流程,本文将解析INPUT__FILE__NAME的相关源码。

INPUT__FILE__NAME

        populateVirtualColumnValues获取filename时,首先需要调用ctx.inputFileChanged()来更新输入文件路径,同时判断文件是否需要更新。对于CombineHiveInputFormat,当一个文件读取结束后,会读取下一个文件path。

  public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, List<VirtualColumn> vcs,
    Object[] vcValues, Deserializer deserializer) {
    ......
    for (int i = 0; i < vcs.size(); i++) {
      switch(vcs.get(i)) {
        case FILENAME :
          if (ctx.inputFileChanged()) {
            vcValues[i] = new Text(ctx.getCurrentInputPath().toString());
          }
          break;
          .......
       }
   }
   ......
}

     inputFileChecked通过resetRow()重置,resetRow()在每处理一条数据都会被调用。

  1.  对于MR作业,resetRow()是在ExecMapper.map内部调用;
  2.  对于Tez作业,resetRow()是MapRecordSource.pushRecord内部调用。

     因为inputFileChecked每读取条数据后都会更新,会导致每次对一行数据设置INPUT__FILE__NAME,都会重新执行lastInputPath.equals(currentInputPath)来判断文件是否变化,但是对于同一个输入文件,这个比较判断是不必要的,这里是一个小小的优化点。

// from ExecMapperContext

  public boolean inputFileChanged() {
    if (!inputFileChecked) {
      currentInputPath = this.ioCxt.getInputPath();
      inputFileChecked = true;
    }
    return lastInputPath == null || !lastInputPath.equals(currentInputPath);
  }

  public void resetRow() {
    // 保存上一条数据的inputpath
    lastInputPath = currentInputPath;
    inputFileChecked = false;
  }

    在inputFileChecked为false时,更新当前行的文件路径currentInputPath,this.ioCxt.getInputPath()里的inputPath是HiveContextAwareRecordReader.initIOContext处设置。

    那么initIOContext是什么时候被调用的?答案是在每次文件都去完后,重新初始化RecordReader时被调用。对于MR作业,是CombineHiveRecordReader;对于Tez作业,是HiveRecordReader。

    而初始化过程被包装为函数initNextRecordReader:

     如果是MR作业,则是HadoopShimsSecure$CombineFileRecordReader.initNextRecordReader。

     如果是Tez作业,则是TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader。

// from HiveContextAwareRecordReader

private void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
  ioCxtRef = this.getIOContext();
  ioCxtRef.setCurrentBlockStart(startPos);
  ioCxtRef.setBlockPointer(isBlockPointer);
  ioCxtRef.setInputPath(inputPath); // 设置inputPath
  initDone = true;
}

      可以看到HadoopShimsSecure$CombineFileRecordReader.next在一个文件读取完之后,会初始化读取下一个文件。TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next也是一样。

// from HadoopShimsSecure$CombineFileRecordReader

public boolean next(K key, V value) throws IOException {

  while ((curReader == null)
      || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
          value)) {
    if (!initNextRecordReader(key)) {
      return false;
    }
  }
  return true;
}


protected boolean initNextRecordReader(K key) throws IOException {

  RecordReader preReader = curReader; //it is OK, curReader is closed, for we only need footer buffer info from preReader.
  if (curReader != null) {
	// 关闭当前RecordReader
	curReader.close();
	curReader = null;
	if (idx > 0) {
	  progress += split.getLength(idx - 1); // done processing so far
	}
  }
  ......
  // 如果所有文件都被读取,则返回false
  if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
	return false;
  }
  // 初始化下一个CombineHiveRecordReader
  curReader = rrConstructor.newInstance(new Object[]
		{split, jc, reporter, Integer.valueOf(idx), preReader});
  ......
  idx++;
  return true;
}

    CombineHiveRecordReader构造函数通过partition(注意这里的partition非分区的意思,如上代码,是单纯的一个数组index)定位需要读取的文件路径。通过调用initIOContext初始化IO信息。

// from CombineHiveRecordReader

public CombineHiveRecordReader(InputSplit split, Configuration conf,
      Reporter reporter, Integer partition, RecordReader preReader) {
    CombineHiveInputSplit hsplit = split instanceof CombineHiveInputSplit ?
        (CombineHiveInputSplit) split :
        new CombineHiveInputSplit(jobConf, (CombineFileSplit) split);
    ......
    // create a split for the given partition
    FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
        .getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit
        .getLocations());

    this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf, reporter));
    // 这里调用initIOContext设置inputPath
    this.initIOContext(fsplit, jobConf, inputFormatClass, this.recordReader);
    .......
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。