Hive虚拟列的生成与计算【2】

举报
想要一只猫 发表于 2021/09/03 01:16:21 2021/09/03
【摘要】 本文主要解析介绍Hive虚拟列相关源码

1. 介绍

        上文介绍了Hive给虚拟列赋值的时机,其中INPUT__FILE__NAME、BLOCK__OFFSET__INSIDE__FILE、ROW__OFFSET__INSIDE__BLOCK、RAW__DATA__SIZE、ROW__ID是在MapOperator执行的时候赋值的,而GROUPING__ID是在GroupByOperator赋值的。接下来,本文将介绍虚拟列生成所依赖的两个重要对象ExecMapperContext和 IOContext

2. ExecMapperContext

        上文中设计的MapOperator的populateVirtualColumnValues函数的参数中有一项ExecMapperContext ctx,从代码可以看出,INPUT__FILE__NAME、BLOCK__OFFSET__INSIDE__FILE、ROW__OFFSET__INSIDE__BLOCK、ROW__ID等虚拟列都需要基于ctx或者ctx.getIoCxt()。

        ExecMapperContext保存了MapTask的一些上下文信息,会在MapOperator生成之后创建,并传给MapOperator及其下游的Operator。

public abstract class Operator<T extends OperatorDesc> implements Serializable,Cloneable,Node {
       ......
       private transient ExecMapperContext execContext;
       ......
}

         对于MapReduce作业来说,MapOperator和ExecMapperContext是在ExecMapper初始化的。

public class ExecMapper extends MapReduceBase implements Mapper {
       private AbstractMapOperator mo;
       private ExecMapperContext execContext = null;
       ......
       public void configure(JobConf job) {
              execContext = new ExecMapperContext(job);
              ......
              if (mrwork.getVectorMode()) {
                      mo = new VectorMapOperator(runtimeCtx);
              } else {
                      mo = new MapOperator(runtimeCtx);
              }
              ....
       }
       ......
}

         对于Tez作业来说,MapOperator和ExecMapperContext是在MapRecordProcessor被初始化的。

public class MapRecordProcessor extends RecordProcessor {
       private AbstractMapOperator mapOp;
       ......
       public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
               ......
               execContext = new ExecMapperContext(jconf);
               ......
       }

       void init(MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
              ......
              if (mapWork.getVectorMode()) {
                      mapOp = new VectorMapOperator(runtimeCtx);
              } else {
                      mapOp = new MapOperator(runtimeCtx);
              }
              mapOp.setExecContext(execContext);
              ......
       }
}

3. IOContext

         IOContext是ExecMapperContext中的一个属性,里面保存了虚拟列需要的信息,这些信息会在虚拟列生成的文章中介绍。

public class IOContext {
  private long currentBlockStart;
  private long nextBlockStart;
  private long currentRow;
  private boolean isBlockPointer;
  private boolean ioExceptions;

  // Are we using the fact the input is sorted
  private boolean useSorted = false;
  // Are we currently performing a binary search
  private boolean isBinarySearching = false;
  // Do we want to end the binary search
  private boolean endBinarySearch = false;
  // The result of the comparison of the last row processed
  private Comparison comparison = null;
  // The class name of the generic UDF being used by the filter
  private String genericUDFClassName = null;
  /**
   * supports {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID}
   */
  private  RecordIdentifier ri;

  ......
}

        IOContext是在创建ExecMapperContext的实例创建的。

// from ExecMapperContext
  
  public ExecMapperContext(JobConf jc) {
    this.jc = jc;
    ioCxt = IOContextMap.get(jc);
  }

// from IOContextMap

  public static IOContext get(Configuration conf) {
    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
      return sparkThreadLocal.get();
    }
    String inputName = conf.get(Utilities.INPUT_NAME);
    if (inputName == null) {
      inputName = DEFAULT_CONTEXT;
    }
    String attemptId = threadAttemptId.get();
    ConcurrentHashMap<String, IOContext> map;
    if (attemptId == null) {
      map = globalMap;
    } else {
      map = attemptMap.get(attemptId);
      if (map == null) {
        map = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, IOContext> oldMap = attemptMap.putIfAbsent(attemptId, map);
        if (oldMap != null) {
          map = oldMap;
        }
      }
    }

    IOContext ioContext = map.get(inputName);
    if (ioContext != null) return ioContext;
    ioContext = new IOContext();
    IOContext oldContext = map.putIfAbsent(inputName, ioContext);
    return (oldContext == null) ? ioContext : oldContext;
  }
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。