Hive虚拟列的生成与计算【4】
介绍
上文解析了虚拟列INPUT__FILE__NAME的相关源码,本文将继续解析BLOCK__OFFSET__INSIDE__FILE列的生成相关的源码。
BLOCAK__OFFSET__INSIDE__FILE
还是populateVirtualColumnValues函数,从源码可知BLOCK__OFFSET__INSIDE__FILE来源于IOContext的currenBlockStart。接下来我们再看这个currentBlockStart是什么时候设置的。
// from MapOperator
  public static Object[] populateVirtualColumnValues(ExecMapperContext ctx,
      List<VirtualColumn> vcs, Object[] vcValues, Deserializer deserializer) {
    ......
      switch(vcs.get(i)) {
        ......
        case BLOCKOFFSET: {
          long current = ctx.getIoCxt().getCurrentBlockStart();
          LongWritable old = (LongWritable) vcValues[i];
          if (old == null) {
            old = new LongWritable(current);
            vcValues[i] = old;
            continue;
          }
          if (current != old.get()) {
            old.set(current);
          }
        }
        ......
    }
    ......
  }
首先是initIOContext,从前文我们知道,initIOContext是在根据新的文件初始化RecordReader的时候会被调用,而入参startPos正是新文件分片的起始offset。
// from HiveContextAwareRecordReader
  public void initIOContext(FileSplit split, JobConf job,
      Class inputFormatClass, RecordReader recordReader) throws IOException {
    boolean blockPointer = false;
    long blockStart = -1;
    FileSplit fileSplit = split;
    Path path = fileSplit.getPath();
    FileSystem fs = path.getFileSystem(job); 
   // 如果是RCFile和SequenceFile(Block Compressed),则blockPointer=true;
    if (inputFormatClass.getName().contains("SequenceFile")) {
      SequenceFile.Reader in = new SequenceFile.Reader(fs, path, job);
      blockPointer = in.isBlockCompressed();
      in.sync(fileSplit.getStart());
      blockStart = in.getPosition();
      in.close();
    } else if (recordReader instanceof RCFileRecordReader) {
      blockPointer = true;
      blockStart = ((RCFileRecordReader) recordReader).getStart();
    } else if (inputFormatClass.getName().contains("RCFile")) {
      blockPointer = true;
      RCFile.Reader in = new RCFile.Reader(fs, path, job);
      in.sync(fileSplit.getStart());
      blockStart = in.getPosition();
      in.close();
    }
    this.jobConf = job;
    this.initIOContext(blockStart, blockPointer, path.makeQualified(fs));
    this.initIOContextSortedProps(split, recordReader, job);
  }
  private void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
    ioCxtRef = this.getIOContext();
    ioCxtRef.setCurrentBlockStart(startPos);
    ioCxtRef.setBlockPointer(isBlockPointer);
    ioCxtRef.setInputPath(inputPath);
    LOG.debug("Processing file " + inputPath); // Logged at INFO in multiple other places.
    initDone = true;
  }
对于每条数据,currentBlocakStart都会重新设置,这是通过HiveContextAwareRecordReader.updateIOContext()来实现的。可以看到updateIOContext的执行逻辑依赖于isBlockPointer,isBlockPointer也是在initIOContext设置的:对于ORC/Parquet/Text格式,isBlockPointer为false;对于RCFile和SequenceFile,isBlockPointer为true。
无论是什么格式,currentBlockStart的值都来源于pointPos,这个值来源于getPos(),getPos()在HiveContextAwareRecordReader并未实现此方法,而是依赖于其子类的getPos()。对于MR任务,就是CombineHiveRecordReader.getPos();对于Tez任务,就是HiveRecordReader.getPos()。
最终是调用各个文件格式的RecordReader的getPos(),比如ORC,就是OrcRecordReader.getPos(),如果是Parquet,则是ParquetRecordReaderWrapper.getPos()。具体的实现方式根据不同的格式有不同的计算方式。
// from HiveContextAwareRecordReader
protected void updateIOContext() throws IOException {
  long pointerPos = this.getPos();
  // 如果是ORC/Parquet/Text,isBlockPointer=false
  if (!ioCxtRef.isBlockPointer()) {
    ioCxtRef.setCurrentBlockStart(pointerPos);
    ioCxtRef.setCurrentRow(0);
    return;
  }
  // RCFile和SequenceFile会执行以下代码
  ioCxtRef.setCurrentRow(ioCxtRef.getCurrentRow() + 1);
  if (ioCxtRef.getNextBlockStart() == -1) {
    ioCxtRef.setNextBlockStart(pointerPos);
    ioCxtRef.setCurrentRow(0);
  }
  if (pointerPos != ioCxtRef.getNextBlockStart()) {
    // the reader pointer has moved to the end of next block, or the end of
    // current record.
    ioCxtRef.setCurrentRow(0);
    if (ioCxtRef.getCurrentBlockStart() == ioCxtRef.getNextBlockStart()) {
      ioCxtRef.setCurrentRow(1);
    }
    ioCxtRef.setCurrentBlockStart(ioCxtRef.getNextBlockStart());
    ioCxtRef.setNextBlockStart(pointerPos);
  }
}
ORC BLOCK__OFFSET__INSIDE__FILE
之前提到ORC文件的getPos()最终是调用OrcRecordReader.getPos(),本节将介绍OrcRecordReader.getPos()是如何计算的。
我们看代码,offset即split的start offset,length就是split的length,而progress最终来自ORC Reader的实现RecordReaderImpl.getProgress()。
// OrcRecordReader
   public long getPos() throws IOException {
      // offset 为split start
      // length为split end - start
      return offset + (long) (progress * length);
   }
    public boolean next(NullWritable key, OrcStruct value) throws IOException {
      if (reader.hasNext()) {
        reader.next(value);
        // 这里reader是org.apache.orc.impl.RecordReaderImpl
        progress = reader.getProgress();
        return true;
      } else {
        return false;
      }
    }
我们再看RecordReaderImpl的getProgress(),解释一下各个变量的含义:
rowBaseInStripe:已经读取的stripe的总行数
rowInStripe:这一行在当前读取的stripe的行数
totalRowCout:当前split包含的所有的stripe的行数之和。
//from  org.apache.orc.impl.RecordReaderImpl
public float getProgress() {
    return ((float)this.rowBaseInStripe + (float)this.rowInStripe) / (float)this.totalRowCount;
}
下图是一个ORC split的示意,不过里面的蓝色部分表达不是特别准确,我们知道ORC是列式存储的,每个stripe里面又由多个row group组成,蓝色在这里只是读取行进度的概念,而非长度的概念。综合起来,OrcRecordReader.getPos()最终返回的就是读取当前split的offset的一个估值,对于ORC这种列式存储来说,行的Offset几乎是不存在这个含义的。

其他文件格式也大概是这么个调用流程,作者后续会再补充。
- 点赞
 - 收藏
 - 关注作者
 
            
           
评论(0)