Hive虚拟列的生成与计算【1】
1. 介绍
虚拟列(Virtual column)自Hive 0.8.0开始引入,截止2021年,Hive支持的虚拟列有INPUT__FILE__NAME、BLOCK__OFFSET__INSIDE__FILE、ROW__OFFSET__INSIDE__BLOCK、RAW__DATA__SIZE、ROW__ID、GROUPING__ID、ROW__IS__DELETED(HIVE-24855, 最新虚拟列,本文不讲解)
当前除了(HIVE-17116 ROW__ID向量化、HIVE-24855 ROW__IS__DELETED向量化)外其他的虚拟列都是不支持向量化的,因此这里只讲解非向量化执行的情况下的虚拟列的生成与计算。
2. HIVE是什么时候给虚拟列赋值的?
对于非向量执行而言,是在MapOperator的时候进行赋值的(GROUPING__ID除外)。首先是调用hasVC()函数判断是否有虚拟列,如果有的话就会调用populateVirtualColumnValues计算虚拟列的值并加入行中,调用链为MapOperator.process-> MapOperator.readRow 。
那么MapOperator的process是什么时候被执行的呢?
- 对于MapReduce作业来说,是在ExecMapper.run的时候被调用的。
- 对于Tez作业来说,是在MapRecordSource.processRow被调用。
调用链如下图所示:
// from MapOperator
private Object readRow(Writable value, ExecMapperContext context) throws SerDeException {
Object deserialized = deserializer.deserialize(value);
Object row = partTblObjectInspectorConverter.convert(deserialized);
// 判断是否有虚拟列
if (hasVC()) {
rowWithPartAndVC[0] = row;
if (context != null) {
populateVirtualColumnValues(context, vcs, vcValues, deserializer);
}
int vcPos = isPartitioned() ? 2 : 1;
rowWithPartAndVC[vcPos] = vcValues;
return rowWithPartAndVC;
} else if (isPartitioned()) {
rowWithPart[0] = row;
return rowWithPart;
}
return row;
}
private boolean hasVC() {
return vcsObjectInspector != null;
}
public static Object[] populateVirtualColumnValues(ExecMapperContext ctx,
List<VirtualColumn> vcs, Object[] vcValues, Deserializer deserializer) {
if (vcs == null) {
return vcValues;
}
if (vcValues == null) {
vcValues = new Object[vcs.size()];
}
for (int i = 0; i < vcs.size(); i++) {
switch(vcs.get(i)) {
case FILENAME : // 设置INPUT__FILE__NAME
if (ctx.inputFileChanged()) {
vcValues[i] = new Text(ctx.getCurrentInputPath().toString());
}
break;
case BLOCKOFFSET: { // 设置BLOCK__OFFSET__INSIDE__FILE
long current = ctx.getIoCxt().getCurrentBlockStart();
LongWritable old = (LongWritable) vcValues[i];
if (old == null) {
old = new LongWritable(current);
vcValues[i] = old;
continue;
}
if (current != old.get()) {
old.set(current);
}
}
break;
case ROWOFFSET: { // 设置ROW__OFFSET__INSIDE__BLOCK
long current = ctx.getIoCxt().getCurrentRow();
LongWritable old = (LongWritable) vcValues[i];
if (old == null) {
old = new LongWritable(current);
vcValues[i] = old;
continue;
}
if (current != old.get()) {
old.set(current);
}
}
break;
case RAWDATASIZE: // 设置RAW__DATA__SIZE
long current = 0L;
SerDeStats stats = deserializer.getSerDeStats();
if(stats != null) {
current = stats.getRawDataSize();
}
LongWritable old = (LongWritable) vcValues[i];
if (old == null) {
old = new LongWritable(current);
vcValues[i] = old;
continue;
}
if (current != old.get()) {
old.set(current);
}
break;
case ROWID: // 设置ROW__ID
if(ctx.getIoCxt().getRecordIdentifier() == null) {
vcValues[i] = null;
}
else {
if(vcValues[i] == null) {
vcValues[i] = new Object[RecordIdentifier.Field.values().length];
}
RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().getRecordIdentifier(), (Object[])vcValues[i]);
ctx.getIoCxt().setRecordIdentifier(null);//so we don't accidentally cache the value; shouldn't
//happen since IO layer either knows how to produce ROW__ID or not - but to be safe
}
break;
}
}
return vcValues;
}
具体GROUPING__ID值在SQL解析的阶段就已经计算好了,Map端的GroupByOperator的process赋值给行的。需要注意的是,如果要显式地使用GROUPING__ID列,则必须配合WITH ROLLUP、WITH CUBE、GROUPING SETS。比如
select a, sum(b), GROUPING__ID from t group by a; 会报错
select a, sum(b), GROUPING__ID from t group by a WITH ROLLUP; 正确
// from GroupByOperator
protected void initializeOp(Configuration hconf) throws HiveException {
......
// 是否有grouping sets (WITH ROLLUP/WITH CUBE其实都会转为grouping sets的表达形式)
groupingSetsPresent = conf.isGroupingSetsPresent();
if (groupingSetsPresent) {
// 这里保存了GROUPING__ID值列表
groupingSets = conf.getListGroupingSets();
// 这个是Grouping Sets字段列表长度如GROUPING SETS ( (a, b, c), (a, c), (a), (c), ( )),groupingSetsPosition就是3
groupingSetsPosition = conf.getGroupingSetPosition();
newKeysGroupingSets = new LongWritable[groupingSets.size()];
groupingSetsBitSet = new FastBitSet[groupingSets.size()];
int pos = 0;
for (Long groupingSet: groupingSets) {
// Create the mapping corresponding to the grouping set
newKeysGroupingSets[pos] = new LongWritable(groupingSet);
// bitSet用于表示要设为null的位置,比如上面grouping sets中的(a, c)的bitSet就是010,代表b这一列需要设为NULL
groupingSetsBitSet[pos] = groupingSet2BitSet(groupingSet, groupingSetsPosition);
pos++;
}
}
......
}
GROUPING__ID在process的时候将newKeysGroupingSets的值赋予具体的行
// from GroupByOperator
public void process(Object row, int tag) throws HiveException {
......
if (groupingSetsPresent) {
Object[] newKeysArray = newKeys.getKeyArray(); // return keys
Object[] cloneNewKeysArray = new Object[newKeysArray.length];
for (int keyPos = 0; keyPos < groupingSetsPosition; keyPos++) {
cloneNewKeysArray[keyPos] = newKeysArray[keyPos];
}
for (int groupingSetPos = 0; groupingSetPos < groupingSets.size(); groupingSetPos++) {
for (int keyPos = 0; keyPos < groupingSetsPosition; keyPos++) {
newKeysArray[keyPos] = null;
}
FastBitSet bitset = groupingSetsBitSet[groupingSetPos];
// Some keys need to be left to null corresponding to that grouping set.
// 按照bitSet保留原值,对于group by a, b, c 如果bitSet是010,则表示keyPos为0和2就表示ClearBit,需要保留原值,1其他就为null
for (int keyPos = bitset.nextClearBit(0); keyPos < groupingSetsPosition; keyPos = bitset.nextClearBit(keyPos+1)) {
newKeysArray[keyPos] = cloneNewKeysArray[keyPos];
}
// 这里就是给当前这条数据赋予GROUPING_ID的值
newKeysArray[groupingSetsPosition] = newKeysGroupingSets[groupingSetPos];
processKey(row, rowInspector);
}
} else {
processKey(row, rowInspector);
}
......
}
至此,本文解析了各个虚拟列是什么时候被赋上值的,接下来的文章将会详解这些虚拟列的值是如何生成的。
- 点赞
- 收藏
- 关注作者
评论(0)