Hive虚拟列的生成与计算【2】
1. 介绍
上文介绍了Hive给虚拟列赋值的时机,其中INPUT__FILE__NAME、BLOCK__OFFSET__INSIDE__FILE、ROW__OFFSET__INSIDE__BLOCK、RAW__DATA__SIZE、ROW__ID是在MapOperator执行的时候赋值的,而GROUPING__ID是在GroupByOperator赋值的。接下来,本文将介绍虚拟列生成所依赖的两个重要对象ExecMapperContext和 IOContext。
2. ExecMapperContext
上文中设计的MapOperator的populateVirtualColumnValues函数的参数中有一项ExecMapperContext ctx,从代码可以看出,INPUT__FILE__NAME、BLOCK__OFFSET__INSIDE__FILE、ROW__OFFSET__INSIDE__BLOCK、ROW__ID等虚拟列都需要基于ctx或者ctx.getIoCxt()。
ExecMapperContext保存了MapTask的一些上下文信息,会在MapOperator生成之后创建,并传给MapOperator及其下游的Operator。
public abstract class Operator<T extends OperatorDesc> implements Serializable,Cloneable,Node {
......
private transient ExecMapperContext execContext;
......
}
对于MapReduce作业来说,MapOperator和ExecMapperContext是在ExecMapper初始化的。
public class ExecMapper extends MapReduceBase implements Mapper {
private AbstractMapOperator mo;
private ExecMapperContext execContext = null;
......
public void configure(JobConf job) {
execContext = new ExecMapperContext(job);
......
if (mrwork.getVectorMode()) {
mo = new VectorMapOperator(runtimeCtx);
} else {
mo = new MapOperator(runtimeCtx);
}
....
}
......
}
对于Tez作业来说,MapOperator和ExecMapperContext是在MapRecordProcessor被初始化的。
public class MapRecordProcessor extends RecordProcessor {
private AbstractMapOperator mapOp;
......
public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
......
execContext = new ExecMapperContext(jconf);
......
}
void init(MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
......
if (mapWork.getVectorMode()) {
mapOp = new VectorMapOperator(runtimeCtx);
} else {
mapOp = new MapOperator(runtimeCtx);
}
mapOp.setExecContext(execContext);
......
}
}
3. IOContext
IOContext是ExecMapperContext中的一个属性,里面保存了虚拟列需要的信息,这些信息会在虚拟列生成的文章中介绍。
public class IOContext {
private long currentBlockStart;
private long nextBlockStart;
private long currentRow;
private boolean isBlockPointer;
private boolean ioExceptions;
// Are we using the fact the input is sorted
private boolean useSorted = false;
// Are we currently performing a binary search
private boolean isBinarySearching = false;
// Do we want to end the binary search
private boolean endBinarySearch = false;
// The result of the comparison of the last row processed
private Comparison comparison = null;
// The class name of the generic UDF being used by the filter
private String genericUDFClassName = null;
/**
* supports {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID}
*/
private RecordIdentifier ri;
......
}
IOContext是在创建ExecMapperContext的实例创建的。
// from ExecMapperContext
public ExecMapperContext(JobConf jc) {
this.jc = jc;
ioCxt = IOContextMap.get(jc);
}
// from IOContextMap
public static IOContext get(Configuration conf) {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
return sparkThreadLocal.get();
}
String inputName = conf.get(Utilities.INPUT_NAME);
if (inputName == null) {
inputName = DEFAULT_CONTEXT;
}
String attemptId = threadAttemptId.get();
ConcurrentHashMap<String, IOContext> map;
if (attemptId == null) {
map = globalMap;
} else {
map = attemptMap.get(attemptId);
if (map == null) {
map = new ConcurrentHashMap<>();
ConcurrentHashMap<String, IOContext> oldMap = attemptMap.putIfAbsent(attemptId, map);
if (oldMap != null) {
map = oldMap;
}
}
}
IOContext ioContext = map.get(inputName);
if (ioContext != null) return ioContext;
ioContext = new IOContext();
IOContext oldContext = map.putIfAbsent(inputName, ioContext);
return (oldContext == null) ? ioContext : oldContext;
}
- 点赞
- 收藏
- 关注作者
评论(0)