Hive虚拟列的生成与计算【4】

举报
想要一只猫 发表于 2021/09/09 23:17:24 2021/09/09
【摘要】 本文主要解析介绍Hive虚拟列BLOCK__OFFSET__INSIDE__FILE相关源码

介绍

      上文解析了虚拟列INPUT__FILE__NAME的相关源码,本文将继续解析BLOCK__OFFSET__INSIDE__FILE列的生成相关的源码。

BLOCAK__OFFSET__INSIDE__FILE

      还是populateVirtualColumnValues函数,从源码可知BLOCK__OFFSET__INSIDE__FILE来源于IOContext的currenBlockStart。接下来我们再看这个currentBlockStart是什么时候设置的。

// from MapOperator

  public static Object[] populateVirtualColumnValues(ExecMapperContext ctx,
      List<VirtualColumn> vcs, Object[] vcValues, Deserializer deserializer) {
    ......
      switch(vcs.get(i)) {
        ......
        case BLOCKOFFSET: {
          long current = ctx.getIoCxt().getCurrentBlockStart();
          LongWritable old = (LongWritable) vcValues[i];
          if (old == null) {
            old = new LongWritable(current);
            vcValues[i] = old;
            continue;
          }
          if (current != old.get()) {
            old.set(current);
          }
        }
        ......
    }
    ......
  }

      首先是initIOContext,从前文我们知道,initIOContext是在根据新的文件初始化RecordReader的时候会被调用,而入参startPos正是新文件分片的起始offset。

// from HiveContextAwareRecordReader

  public void initIOContext(FileSplit split, JobConf job,
      Class inputFormatClass, RecordReader recordReader) throws IOException {

    boolean blockPointer = false;
    long blockStart = -1;
    FileSplit fileSplit = split;
    Path path = fileSplit.getPath();
    FileSystem fs = path.getFileSystem(job); 
   // 如果是RCFile和SequenceFile(Block Compressed),则blockPointer=true;
    if (inputFormatClass.getName().contains("SequenceFile")) {
      SequenceFile.Reader in = new SequenceFile.Reader(fs, path, job);
      blockPointer = in.isBlockCompressed();
      in.sync(fileSplit.getStart());
      blockStart = in.getPosition();
      in.close();
    } else if (recordReader instanceof RCFileRecordReader) {
      blockPointer = true;
      blockStart = ((RCFileRecordReader) recordReader).getStart();
    } else if (inputFormatClass.getName().contains("RCFile")) {
      blockPointer = true;
      RCFile.Reader in = new RCFile.Reader(fs, path, job);
      in.sync(fileSplit.getStart());
      blockStart = in.getPosition();
      in.close();
    }
    this.jobConf = job;
    this.initIOContext(blockStart, blockPointer, path.makeQualified(fs));

    this.initIOContextSortedProps(split, recordReader, job);
  }

  private void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
    ioCxtRef = this.getIOContext();
    ioCxtRef.setCurrentBlockStart(startPos);
    ioCxtRef.setBlockPointer(isBlockPointer);
    ioCxtRef.setInputPath(inputPath);
    LOG.debug("Processing file " + inputPath); // Logged at INFO in multiple other places.
    initDone = true;
  }

      对于每条数据,currentBlocakStart都会重新设置,这是通过HiveContextAwareRecordReader.updateIOContext()来实现的。可以看到updateIOContext的执行逻辑依赖于isBlockPointer,isBlockPointer也是在initIOContext设置的:对于ORC/Parquet/Text格式,isBlockPointer为false;对于RCFile和SequenceFile,isBlockPointer为true。

      无论是什么格式,currentBlockStart的值都来源于pointPos,这个值来源于getPos(),getPos()在HiveContextAwareRecordReader并未实现此方法,而是依赖于其子类的getPos()。对于MR任务,就是CombineHiveRecordReader.getPos();对于Tez任务,就是HiveRecordReader.getPos()。

      最终是调用各个文件格式的RecordReader的getPos(),比如ORC,就是OrcRecordReader.getPos(),如果是Parquet,则是ParquetRecordReaderWrapper.getPos()。具体的实现方式根据不同的格式有不同的计算方式。

// from HiveContextAwareRecordReader

protected void updateIOContext() throws IOException {
  long pointerPos = this.getPos();
  // 如果是ORC/Parquet/Text,isBlockPointer=false
  if (!ioCxtRef.isBlockPointer()) {
    ioCxtRef.setCurrentBlockStart(pointerPos);
    ioCxtRef.setCurrentRow(0);
    return;
  }

  // RCFile和SequenceFile会执行以下代码
  ioCxtRef.setCurrentRow(ioCxtRef.getCurrentRow() + 1);

  if (ioCxtRef.getNextBlockStart() == -1) {
    ioCxtRef.setNextBlockStart(pointerPos);
    ioCxtRef.setCurrentRow(0);
  }
  if (pointerPos != ioCxtRef.getNextBlockStart()) {
    // the reader pointer has moved to the end of next block, or the end of
    // current record.

    ioCxtRef.setCurrentRow(0);

    if (ioCxtRef.getCurrentBlockStart() == ioCxtRef.getNextBlockStart()) {
      ioCxtRef.setCurrentRow(1);
    }

    ioCxtRef.setCurrentBlockStart(ioCxtRef.getNextBlockStart());
    ioCxtRef.setNextBlockStart(pointerPos);
  }
}

ORC  BLOCK__OFFSET__INSIDE__FILE

      之前提到ORC文件的getPos()最终是调用OrcRecordReader.getPos(),本节将介绍OrcRecordReader.getPos()是如何计算的。

      我们看代码,offset即split的start offset,length就是split的length,而progress最终来自ORC Reader的实现RecordReaderImpl.getProgress()。

// OrcRecordReader

   public long getPos() throws IOException {
      // offset 为split start
      // length为split end - start
      return offset + (long) (progress * length);
   }

    public boolean next(NullWritable key, OrcStruct value) throws IOException {
      if (reader.hasNext()) {
        reader.next(value);
        // 这里reader是org.apache.orc.impl.RecordReaderImpl
        progress = reader.getProgress();
        return true;
      } else {
        return false;
      }
    }

      我们再看RecordReaderImpl的getProgress(),解释一下各个变量的含义:

      rowBaseInStripe:已经读取的stripe的总行数

      rowInStripe:这一行在当前读取的stripe的行数

      totalRowCout:当前split包含的所有的stripe的行数之和。

//from  org.apache.orc.impl.RecordReaderImpl

public float getProgress() {
    return ((float)this.rowBaseInStripe + (float)this.rowInStripe) / (float)this.totalRowCount;
}

      下图是一个ORC split的示意,不过里面的蓝色部分表达不是特别准确,我们知道ORC是列式存储的,每个stripe里面又由多个row group组成,蓝色在这里只是读取行进度的概念,而非长度的概念。综合起来,OrcRecordReader.getPos()最终返回的就是读取当前split的offset的一个估值,对于ORC这种列式存储来说,行的Offset几乎是不存在这个含义的。

      其他文件格式也大概是这么个调用流程,作者后续会再补充。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。