Hive虚拟列的生成与计算【4】
介绍
上文解析了虚拟列INPUT__FILE__NAME的相关源码,本文将继续解析BLOCK__OFFSET__INSIDE__FILE列的生成相关的源码。
BLOCAK__OFFSET__INSIDE__FILE
还是populateVirtualColumnValues函数,从源码可知BLOCK__OFFSET__INSIDE__FILE来源于IOContext的currenBlockStart。接下来我们再看这个currentBlockStart是什么时候设置的。
// from MapOperator
public static Object[] populateVirtualColumnValues(ExecMapperContext ctx,
List<VirtualColumn> vcs, Object[] vcValues, Deserializer deserializer) {
......
switch(vcs.get(i)) {
......
case BLOCKOFFSET: {
long current = ctx.getIoCxt().getCurrentBlockStart();
LongWritable old = (LongWritable) vcValues[i];
if (old == null) {
old = new LongWritable(current);
vcValues[i] = old;
continue;
}
if (current != old.get()) {
old.set(current);
}
}
......
}
......
}
首先是initIOContext,从前文我们知道,initIOContext是在根据新的文件初始化RecordReader的时候会被调用,而入参startPos正是新文件分片的起始offset。
// from HiveContextAwareRecordReader
public void initIOContext(FileSplit split, JobConf job,
Class inputFormatClass, RecordReader recordReader) throws IOException {
boolean blockPointer = false;
long blockStart = -1;
FileSplit fileSplit = split;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(job);
// 如果是RCFile和SequenceFile(Block Compressed),则blockPointer=true;
if (inputFormatClass.getName().contains("SequenceFile")) {
SequenceFile.Reader in = new SequenceFile.Reader(fs, path, job);
blockPointer = in.isBlockCompressed();
in.sync(fileSplit.getStart());
blockStart = in.getPosition();
in.close();
} else if (recordReader instanceof RCFileRecordReader) {
blockPointer = true;
blockStart = ((RCFileRecordReader) recordReader).getStart();
} else if (inputFormatClass.getName().contains("RCFile")) {
blockPointer = true;
RCFile.Reader in = new RCFile.Reader(fs, path, job);
in.sync(fileSplit.getStart());
blockStart = in.getPosition();
in.close();
}
this.jobConf = job;
this.initIOContext(blockStart, blockPointer, path.makeQualified(fs));
this.initIOContextSortedProps(split, recordReader, job);
}
private void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
ioCxtRef = this.getIOContext();
ioCxtRef.setCurrentBlockStart(startPos);
ioCxtRef.setBlockPointer(isBlockPointer);
ioCxtRef.setInputPath(inputPath);
LOG.debug("Processing file " + inputPath); // Logged at INFO in multiple other places.
initDone = true;
}
对于每条数据,currentBlocakStart都会重新设置,这是通过HiveContextAwareRecordReader.updateIOContext()来实现的。可以看到updateIOContext的执行逻辑依赖于isBlockPointer,isBlockPointer也是在initIOContext设置的:对于ORC/Parquet/Text格式,isBlockPointer为false;对于RCFile和SequenceFile,isBlockPointer为true。
无论是什么格式,currentBlockStart的值都来源于pointPos,这个值来源于getPos(),getPos()在HiveContextAwareRecordReader并未实现此方法,而是依赖于其子类的getPos()。对于MR任务,就是CombineHiveRecordReader.getPos();对于Tez任务,就是HiveRecordReader.getPos()。
最终是调用各个文件格式的RecordReader的getPos(),比如ORC,就是OrcRecordReader.getPos(),如果是Parquet,则是ParquetRecordReaderWrapper.getPos()。具体的实现方式根据不同的格式有不同的计算方式。
// from HiveContextAwareRecordReader
protected void updateIOContext() throws IOException {
long pointerPos = this.getPos();
// 如果是ORC/Parquet/Text,isBlockPointer=false
if (!ioCxtRef.isBlockPointer()) {
ioCxtRef.setCurrentBlockStart(pointerPos);
ioCxtRef.setCurrentRow(0);
return;
}
// RCFile和SequenceFile会执行以下代码
ioCxtRef.setCurrentRow(ioCxtRef.getCurrentRow() + 1);
if (ioCxtRef.getNextBlockStart() == -1) {
ioCxtRef.setNextBlockStart(pointerPos);
ioCxtRef.setCurrentRow(0);
}
if (pointerPos != ioCxtRef.getNextBlockStart()) {
// the reader pointer has moved to the end of next block, or the end of
// current record.
ioCxtRef.setCurrentRow(0);
if (ioCxtRef.getCurrentBlockStart() == ioCxtRef.getNextBlockStart()) {
ioCxtRef.setCurrentRow(1);
}
ioCxtRef.setCurrentBlockStart(ioCxtRef.getNextBlockStart());
ioCxtRef.setNextBlockStart(pointerPos);
}
}
ORC BLOCK__OFFSET__INSIDE__FILE
之前提到ORC文件的getPos()最终是调用OrcRecordReader.getPos(),本节将介绍OrcRecordReader.getPos()是如何计算的。
我们看代码,offset即split的start offset,length就是split的length,而progress最终来自ORC Reader的实现RecordReaderImpl.getProgress()。
// OrcRecordReader
public long getPos() throws IOException {
// offset 为split start
// length为split end - start
return offset + (long) (progress * length);
}
public boolean next(NullWritable key, OrcStruct value) throws IOException {
if (reader.hasNext()) {
reader.next(value);
// 这里reader是org.apache.orc.impl.RecordReaderImpl
progress = reader.getProgress();
return true;
} else {
return false;
}
}
我们再看RecordReaderImpl的getProgress(),解释一下各个变量的含义:
rowBaseInStripe:已经读取的stripe的总行数
rowInStripe:这一行在当前读取的stripe的行数
totalRowCout:当前split包含的所有的stripe的行数之和。
//from org.apache.orc.impl.RecordReaderImpl
public float getProgress() {
return ((float)this.rowBaseInStripe + (float)this.rowInStripe) / (float)this.totalRowCount;
}
下图是一个ORC split的示意,不过里面的蓝色部分表达不是特别准确,我们知道ORC是列式存储的,每个stripe里面又由多个row group组成,蓝色在这里只是读取行进度的概念,而非长度的概念。综合起来,OrcRecordReader.getPos()最终返回的就是读取当前split的offset的一个估值,对于ORC这种列式存储来说,行的Offset几乎是不存在这个含义的。
其他文件格式也大概是这么个调用流程,作者后续会再补充。
- 点赞
- 收藏
- 关注作者
评论(0)