Hive虚拟列的生成与计算【3】
介绍
前两篇文章分别介绍了虚拟列的生成时机和虚拟列生成依赖的一些对象。接下来的系列文章将介绍个各个虚拟列的具体生成流程,本文将解析INPUT__FILE__NAME的相关源码。
INPUT__FILE__NAME
populateVirtualColumnValues获取filename时,首先需要调用ctx.inputFileChanged()来更新输入文件路径,同时判断文件是否需要更新。对于CombineHiveInputFormat,当一个文件读取结束后,会读取下一个文件path。
public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, List<VirtualColumn> vcs,
Object[] vcValues, Deserializer deserializer) {
......
for (int i = 0; i < vcs.size(); i++) {
switch(vcs.get(i)) {
case FILENAME :
if (ctx.inputFileChanged()) {
vcValues[i] = new Text(ctx.getCurrentInputPath().toString());
}
break;
.......
}
}
......
}
inputFileChecked通过resetRow()重置,resetRow()在每处理一条数据都会被调用。
- 对于MR作业,resetRow()是在ExecMapper.map内部调用;
- 对于Tez作业,resetRow()是MapRecordSource.pushRecord内部调用。
因为inputFileChecked每读取条数据后都会更新,会导致每次对一行数据设置INPUT__FILE__NAME,都会重新执行lastInputPath.equals(currentInputPath)来判断文件是否变化,但是对于同一个输入文件,这个比较判断是不必要的,这里是一个小小的优化点。
// from ExecMapperContext
public boolean inputFileChanged() {
if (!inputFileChecked) {
currentInputPath = this.ioCxt.getInputPath();
inputFileChecked = true;
}
return lastInputPath == null || !lastInputPath.equals(currentInputPath);
}
public void resetRow() {
// 保存上一条数据的inputpath
lastInputPath = currentInputPath;
inputFileChecked = false;
}
在inputFileChecked为false时,更新当前行的文件路径currentInputPath,this.ioCxt.getInputPath()里的inputPath是HiveContextAwareRecordReader.initIOContext处设置。
那么initIOContext是什么时候被调用的?答案是在每次文件都去完后,重新初始化RecordReader时被调用。对于MR作业,是CombineHiveRecordReader;对于Tez作业,是HiveRecordReader。
而初始化过程被包装为函数initNextRecordReader:
如果是MR作业,则是HadoopShimsSecure$CombineFileRecordReader.initNextRecordReader。
如果是Tez作业,则是TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader。
// from HiveContextAwareRecordReader
private void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
ioCxtRef = this.getIOContext();
ioCxtRef.setCurrentBlockStart(startPos);
ioCxtRef.setBlockPointer(isBlockPointer);
ioCxtRef.setInputPath(inputPath); // 设置inputPath
initDone = true;
}
可以看到HadoopShimsSecure$CombineFileRecordReader.next在一个文件读取完之后,会初始化读取下一个文件。TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next也是一样。
// from HadoopShimsSecure$CombineFileRecordReader
public boolean next(K key, V value) throws IOException {
while ((curReader == null)
|| !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
value)) {
if (!initNextRecordReader(key)) {
return false;
}
}
return true;
}
protected boolean initNextRecordReader(K key) throws IOException {
RecordReader preReader = curReader; //it is OK, curReader is closed, for we only need footer buffer info from preReader.
if (curReader != null) {
// 关闭当前RecordReader
curReader.close();
curReader = null;
if (idx > 0) {
progress += split.getLength(idx - 1); // done processing so far
}
}
......
// 如果所有文件都被读取,则返回false
if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
return false;
}
// 初始化下一个CombineHiveRecordReader
curReader = rrConstructor.newInstance(new Object[]
{split, jc, reporter, Integer.valueOf(idx), preReader});
......
idx++;
return true;
}
CombineHiveRecordReader构造函数通过partition(注意这里的partition非分区的意思,如上代码,是单纯的一个数组index)定位需要读取的文件路径。通过调用initIOContext初始化IO信息。
// from CombineHiveRecordReader
public CombineHiveRecordReader(InputSplit split, Configuration conf,
Reporter reporter, Integer partition, RecordReader preReader) {
CombineHiveInputSplit hsplit = split instanceof CombineHiveInputSplit ?
(CombineHiveInputSplit) split :
new CombineHiveInputSplit(jobConf, (CombineFileSplit) split);
......
// create a split for the given partition
FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
.getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit
.getLocations());
this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf, reporter));
// 这里调用initIOContext设置inputPath
this.initIOContext(fsplit, jobConf, inputFormatClass, this.recordReader);
.......
}
- 点赞
- 收藏
- 关注作者
评论(0)