Hive虚拟列的生成与计算【1】

举报
想要一只猫 发表于 2021/09/02 00:38:57 2021/09/02
【摘要】 本文主要解析介绍Hive虚拟列相关源码

1. 介绍

    虚拟列(Virtual column)自Hive 0.8.0开始引入,截止2021年,Hive支持的虚拟列有INPUT__FILE__NAME、BLOCK__OFFSET__INSIDE__FILE、ROW__OFFSET__INSIDE__BLOCK、RAW__DATA__SIZE、ROW__ID、GROUPING__ID、ROW__IS__DELETED(HIVE-24855, 最新虚拟列,本文不讲解)

    当前除了(HIVE-17116 ROW__ID向量化HIVE-24855 ROW__IS__DELETED向量化)外其他的虚拟列都是不支持向量化的,因此这里只讲解非向量化执行的情况下的虚拟列的生成与计算。

2. HIVE是什么时候给虚拟列赋值的?

    对于非向量执行而言,是在MapOperator的时候进行赋值的(GROUPING__ID除外)。首先是调用hasVC()函数判断是否有虚拟列,如果有的话就会调用populateVirtualColumnValues计算虚拟列的值并加入行中,调用链为MapOperator.process-> MapOperator.readRow 。

    那么MapOperator的process是什么时候被执行的呢?

  1.   对于MapReduce作业来说,是在ExecMapper.run的时候被调用的。
  2.   对于Tez作业来说,是在MapRecordSource.processRow被调用。

    调用链如下图所示:


// from MapOperator
  private Object readRow(Writable value, ExecMapperContext context) throws SerDeException {
      Object deserialized = deserializer.deserialize(value);
      Object row = partTblObjectInspectorConverter.convert(deserialized);
      // 判断是否有虚拟列
      if (hasVC()) {
        rowWithPartAndVC[0] = row;
        if (context != null) {
          populateVirtualColumnValues(context, vcs, vcValues, deserializer);
        }
        int vcPos = isPartitioned() ? 2 : 1;
        rowWithPartAndVC[vcPos] = vcValues;
        return  rowWithPartAndVC;
      } else if (isPartitioned()) {
        rowWithPart[0] = row;
        return rowWithPart;
      }
      return row;
  }

  private boolean hasVC() {
      return vcsObjectInspector != null;
  }

  public static Object[] populateVirtualColumnValues(ExecMapperContext ctx,
      List<VirtualColumn> vcs, Object[] vcValues, Deserializer deserializer) {
    if (vcs == null) {
      return vcValues;
    }
    if (vcValues == null) {
      vcValues = new Object[vcs.size()];
    }
    for (int i = 0; i < vcs.size(); i++) {
      switch(vcs.get(i)) {
        case FILENAME :      // 设置INPUT__FILE__NAME
          if (ctx.inputFileChanged()) {
            vcValues[i] = new Text(ctx.getCurrentInputPath().toString());
          }
          break;
        case BLOCKOFFSET: {  // 设置BLOCK__OFFSET__INSIDE__FILE
          long current = ctx.getIoCxt().getCurrentBlockStart();
          LongWritable old = (LongWritable) vcValues[i];
          if (old == null) {
            old = new LongWritable(current);
            vcValues[i] = old;
            continue;
          }
          if (current != old.get()) {
            old.set(current);
          }
        }
        break;
        case ROWOFFSET: {  // 设置ROW__OFFSET__INSIDE__BLOCK
          long current = ctx.getIoCxt().getCurrentRow();
          LongWritable old = (LongWritable) vcValues[i];
          if (old == null) {
            old = new LongWritable(current);
            vcValues[i] = old;
            continue;
          }
          if (current != old.get()) {
            old.set(current);
          }
        }
        break;
        case RAWDATASIZE:  // 设置RAW__DATA__SIZE
          long current = 0L;
          SerDeStats stats = deserializer.getSerDeStats();
          if(stats != null) {
            current = stats.getRawDataSize();
          }
          LongWritable old = (LongWritable) vcValues[i];
          if (old == null) {
            old = new LongWritable(current);
            vcValues[i] = old;
            continue;
          }
          if (current != old.get()) {
            old.set(current);
          }
          break;
        case ROWID:    // 设置ROW__ID
          if(ctx.getIoCxt().getRecordIdentifier() == null) {
            vcValues[i] = null;
          }
          else {
            if(vcValues[i] == null) {
              vcValues[i] = new Object[RecordIdentifier.Field.values().length];
            }
            RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().getRecordIdentifier(), (Object[])vcValues[i]);
            ctx.getIoCxt().setRecordIdentifier(null);//so we don't accidentally cache the value; shouldn't
            //happen since IO layer either knows how to produce ROW__ID or not - but to be safe
          }
	  break;
      }
    }
    return vcValues;
  }

    具体GROUPING__ID值在SQL解析的阶段就已经计算好了,Map端的GroupByOperator的process赋值给行的。需要注意的是,如果要显式地使用GROUPING__ID列,则必须配合WITH ROLLUP、WITH CUBE、GROUPING SETS。比如

    select a, sum(b), GROUPING__ID from t group by  a; 会报错

    select a, sum(b), GROUPING__ID from t group by a WITH ROLLUP; 正确

// from GroupByOperator

  protected void initializeOp(Configuration hconf) throws HiveException {
    ......
    // 是否有grouping sets (WITH ROLLUP/WITH CUBE其实都会转为grouping sets的表达形式)
    groupingSetsPresent = conf.isGroupingSetsPresent();
    if (groupingSetsPresent) {
      // 这里保存了GROUPING__ID值列表
      groupingSets = conf.getListGroupingSets();
      // 这个是Grouping Sets字段列表长度如GROUPING SETS ( (a, b, c), (a, c), (a), (c), ( )),groupingSetsPosition就是3
      groupingSetsPosition = conf.getGroupingSetPosition();
      newKeysGroupingSets = new LongWritable[groupingSets.size()];
      groupingSetsBitSet = new FastBitSet[groupingSets.size()];

      int pos = 0;
      for (Long groupingSet: groupingSets) {
        // Create the mapping corresponding to the grouping set
        newKeysGroupingSets[pos] = new LongWritable(groupingSet);
        // bitSet用于表示要设为null的位置,比如上面grouping sets中的(a, c)的bitSet就是010,代表b这一列需要设为NULL
        groupingSetsBitSet[pos] = groupingSet2BitSet(groupingSet, groupingSetsPosition);
        pos++;
      }
    }
    ......
  }

    GROUPING__ID在process的时候将newKeysGroupingSets的值赋予具体的行

// from GroupByOperator

public void process(Object row, int tag) throws HiveException {
    ......
    if (groupingSetsPresent) {
        Object[] newKeysArray = newKeys.getKeyArray(); // return keys
        Object[] cloneNewKeysArray = new Object[newKeysArray.length];
        
        for (int keyPos = 0; keyPos < groupingSetsPosition; keyPos++) {
               cloneNewKeysArray[keyPos] = newKeysArray[keyPos];
        }

        for (int groupingSetPos = 0; groupingSetPos < groupingSets.size(); groupingSetPos++) {
            
            for (int keyPos = 0; keyPos < groupingSetsPosition; keyPos++) {
                newKeysArray[keyPos] = null;
            }

            FastBitSet bitset = groupingSetsBitSet[groupingSetPos];
            // Some keys need to be left to null corresponding to that grouping set.
            // 按照bitSet保留原值,对于group by a, b, c 如果bitSet是010,则表示keyPos为0和2就表示ClearBit,需要保留原值,1其他就为null
            for (int keyPos = bitset.nextClearBit(0); keyPos < groupingSetsPosition; keyPos = bitset.nextClearBit(keyPos+1)) {
                 newKeysArray[keyPos] = cloneNewKeysArray[keyPos];
            }
            // 这里就是给当前这条数据赋予GROUPING_ID的值
             newKeysArray[groupingSetsPosition] = newKeysGroupingSets[groupingSetPos];
             processKey(row, rowInspector);
         }
    } else {
         processKey(row, rowInspector);
    }
    ......
}

    至此,本文解析了各个虚拟列是什么时候被赋上值的,接下来的文章将会详解这些虚拟列的值是如何生成的。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。