Hive UDF源码解析【1】Create Function

想要一只猫 发表于 2021/08/28 23:41:36 2021/08/28
【摘要】 文本主要解析Hive添加UDF的源码流程

1 简介

    本文主要介绍Hive Create Function的主要流程,之后将解析Show Functions、Drop Function、Reload Function、UDF的调用流程等UDF相关源码。

CREATE FUNCTION [db_name.]function_name AS class_name
  [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];

1.1 简单的用例

// MyAddUDF实现了简单的加法
create function testadd as 'com.test.udf.MyAddUDF'
using
jar 'hdfs://localhost:9000/udf/MyTestUDF-1.0.jar', 
jar 'hdfs://localhost:9000/udf/UDFDep-1.0.jar';           // MyTestUDF的依赖

2 AST解析

using jar 语句被解析为TOK_RESOURCE_LIST,

TOK_CREATEFUNCTION
   testadd
   'com.test.udf.MyAddUDF'
   TOK_RESOURCE_LIST
      TOK_RESOURCE_URI
         TOK_JAR
         'hdfs://localhost:9000/udf/MyTestUDF-1.0.jar'
      TOK_RESOURCE_URI
         TOK_JAR
         'hdfs://localhost:9000/udf/UDFDep-1.0.jar'

3 DDL解析(FunctionSemanticAnalyzer)

     通过SemanticAnalyzerFactory.get可以知道,关于FUNCTION的DDL都是由FunctionSemanticAnalyzer处理。

// from  SemanticAnalyzerFactory

public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree) {
   ......
   switch (tree.getType()) {
       ......
      case HiveParser.TOK_CREATEFUNCTION:
      case HiveParser.TOK_DROPFUNCTION:
      case HiveParser.TOK_RELOADFUNCTION:
        return new FunctionSemanticAnalyzer(queryState);
      ......
   }
}

    SemanticAnalyzerFactory通过AST获取functionName、className、resourceList保存到CreateFunctionDesc中,并创建FunctionTask加入到rootTasks中。

// from FunctionSemanticAnalyzer

  public void analyzeInternal(ASTNode ast) throws SemanticException {
    if (ast.getType() == HiveParser.TOK_CREATEFUNCTION) {
      analyzeCreateFunction(ast);
    } else if (ast.getType() == HiveParser.TOK_DROPFUNCTION) {
      analyzeDropFunction(ast);
    } else if (ast.getType() == HiveParser.TOK_RELOADFUNCTION) {
      rootTasks.add(TaskFactory.get(new FunctionWork(new ReloadFunctionDesc()), conf));
    }
  }

  private void analyzeCreateFunction(ASTNode ast) throws SemanticException {
    String functionName = ast.getChild(0).getText().toLowerCase();   // 获取UDF名
    boolean isTemporaryFunction = (ast.getFirstChildWithType(HiveParser.TOK_TEMPORARY) != null);
    String className = unescapeSQLString(ast.getChild(1).getText()); // 获取类路径名
    ......
    // 从AST获取Resource list
    List<ResourceUri> resources = getResourceList(ast);
    // 执行Task时会使用到CreateFunctionDesc保存的信息
    CreateFunctionDesc desc = new CreateFunctionDesc(functionName, isTemporaryFunction, className, resources);
    // 由FunctionWork获取FunctionTask
    rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf));
    ......
  }

    TaskFactory通过遍历taskvec获取对应的Task子类,FunctionWork对应FunctionTask。

// from TaskFactory
  
  public static <T extends Serializable> Task<T> get(T work, HiveConf conf,
      Task<? extends Serializable>... tasklist) {
    Task<T> ret = get((Class<T>) work.getClass(), conf);
    ......
  }

  public static <T extends Serializable> Task<T> get(Class<T> workClass,
      HiveConf conf) {
    // taskvec存储了 workClass - taskClass的对应关系
    for (TaskTuple<? extends Serializable> t : taskvec) {
      if (t.workClass == workClass) {
        try {
          Task<T> ret = (Task<T>) t.taskClass.newInstance();
          ret.setId("Stage-" + Integer.toString(getAndIncrementId()));
          return ret;
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    }
    ......
  }

  public static ArrayList<TaskTuple<? extends Serializable>> taskvec;
  static {
     ......
     taskvec.add(new TaskTuple<FunctionWork>(FunctionWork.class, FunctionTask.class));
     ......
  }

4 DDL语句执行(FunctionTask)

// from FunctionTask

  public int execute(DriverContext driverContext) {
    CreateFunctionDesc createFunctionDesc = work.getCreateFunctionDesc();
    if (createFunctionDesc != null) {
      if (createFunctionDesc.isTemp()) {
        // 如果是temporary function
        return createTemporaryFunction(createFunctionDesc);
      } else {
        try {
          // 否则添加永久函数(即会将函数信息加入metastore)
          return createPermanentFunction(Hive.get(conf), createFunctionDesc);
        } catch (Exception e) {
          setException(e);
          LOG.error(stringifyException(e));
          return 1;
        }
      }
    }
    ......
  }

4.1 FunctionRegistry

    这里注意checkLocalFunctionResources:当metastore是local模式时,resource uri才可以使用本地文件系统,否则只能使用如hdfs等文件系统。接下来是两个重要的步骤:

    1. 注册UDF:FunctionRegistry.registerPermanentFunction。这里有两个关键点:

          1) 将根据resource URL将jar包UDF下载到本地,并将本地jar包URL加入当前Session的UDFClassLoader的classpath;

          2) FunctionRegistry的静态常量Registry system里注册UDF信息。

    2. UDF信息写入metastore

// from FunctionTask
 
  private int createPermanentFunction(Hive db, CreateFunctionDesc createFunctionDesc)
      throws HiveException, IOException {
    // 如果functionName中不带dbName,即testdb.testfunc,则dbName为当前session的DB
    String[] qualifiedNameParts = FunctionUtils.getQualifiedFunctionNameParts(
        createFunctionDesc.getFunctionName());
    String dbName = qualifiedNameParts[0];
    String funcName = qualifiedNameParts[1];
    String registeredName = FunctionUtils.qualifyFunctionName(funcName, dbName);
    String className = createFunctionDesc.getClassName();
    // resouceList, 即udf jar包路径
    List<ResourceUri> resources = createFunctionDesc.getResources();

    // For permanent functions, check for any resources from local filesystem.
    checkLocalFunctionResources(db, createFunctionDesc.getResources());

    FunctionInfo registered = null;
    try {
      // 注册UDF
      registered = FunctionRegistry.registerPermanentFunction(
        registeredName, className, true, toFunctionResource(resources));
    } catch (RuntimeException ex) {
      Throwable t = ex;
      while (t.getCause() != null) {
        t = t.getCause();
      }
    }
    if (registered == null) {
      console.printError("Failed to register " + registeredName
          + " using class " + createFunctionDesc.getClassName());
      return 1;
    }

    // Add to metastore.   UDF信息加入metastore
    Function func = new Function(
        funcName,
        dbName,
        className,
        SessionState.get().getUserName(),
        PrincipalType.USER,
        (int) (System.currentTimeMillis() / 1000),
        org.apache.hadoop.hive.metastore.api.FunctionType.JAVA,
        resources
    );
    db.createFunction(func);
    return 0;
  }

    FunctionRegistry调用Registry system的registerPermanentFunction函数,Registry system是静态常量,里面保存了所有UDF的信息。

// from FunctionRegistry

  public static FunctionInfo registerPermanentFunction(String functionName,
      String className, boolean registerToSession, FunctionResource[] resources) {
    return system.registerPermanentFunction(functionName, className, registerToSession, resources);
  }

4.2 Registry

    Registry的registerPermanentFunction函数首先执行会registerToSessionRegistry。

// from Registry  
  public FunctionInfo registerPermanentFunction(String functionName,
      String className, boolean registerToSession, FunctionResource... resources) {
    FunctionInfo function = new FunctionInfo(functionName, className, resources);
    // register to session first for backward compatibility
    if (registerToSession) {
      String qualifiedName = FunctionUtils.qualifyFunctionName(
          functionName, SessionState.get().getCurrentDatabase().toLowerCase());
      if (registerToSessionRegistry(qualifiedName, function) != null) {
        addFunction(functionName, function);
        return function;
      }
    } else {
        addFunction(functionName, function);
    }
    return null;
  }

    registerToSessionRegistry有两个主要功能:

    1. 执行FunctionTask.addFunctionResources(resources),作用是下载resources所指的jar包到本地目录,同时将本地的jar包路径的classpath加入ClassLoader中。

    2. 执行SessionState.getRegistryForWrite().registerFunction,作用是将UDF信息加入当前Session的Registry system(注意区别FunctionRegistry的Registry system)。

// from Registry  

// should be called after session registry is checked
private FunctionInfo registerToSessionRegistry(String qualifiedName, FunctionInfo function) {
  FunctionInfo ret = null;
  // 或许当前Session的ClassLoader
  ClassLoader prev = Utilities.getSessionSpecifiedClassLoader();
  try {
    // Found UDF in metastore - now add it to the function registry
    // At this point we should add any relevant jars that would be needed for the UDf.
    FunctionResource[] resources = function.getResources();
    try {
      FunctionTask.addFunctionResources(resources);
    } catch (Exception e) {
      LOG.error("Unable to load resources for " + qualifiedName + ":" + e, e);
      return null;
    }
    ClassLoader loader = Utilities.getSessionSpecifiedClassLoader();
    Class<?> udfClass = Class.forName(function.getClassName(), true, loader);

    // Make sure the FunctionInfo is listed as PERSISTENT (rather than TEMPORARY)
    // when it is registered to the system registry.
    ret = SessionState.getRegistryForWrite().registerFunction(
        qualifiedName, FunctionType.PERSISTENT, udfClass, resources);
    if (ret == null) {
      LOG.error(function.getClassName() + " is not a valid UDF class and was not registered.");
    }
    if (SessionState.get().isHiveServerQuery()) {
      SessionState.getRegistryForWrite().addToUDFLoaders(loader);
    }
  } catch (ClassNotFoundException e) {
    // Lookup of UDf class failed
    LOG.error("Unable to load UDF class: " + e);
    Utilities.restoreSessionSpecifiedClassLoader(prev);
  }
  function.shareStateWith(ret);
  return ret;
}

    addFunctionResources主要调用SessionState的add_resources。

// From FunctionTask

public static void addFunctionResources(FunctionResource[] resources) throws HiveException {
  if (resources != null) {
    Multimap<SessionState.ResourceType, String> mappings = HashMultimap.create();
    for (FunctionResource res : resources) {
      mappings.put(res.getResourceType(), res.getResourceURI());
    }
    for (SessionState.ResourceType type : mappings.keys()) {
      SessionState.get().add_resources(type, mappings.get(type));
    }
  }
}

4.3 SessionState

    SessionState的add_resources两个要点:

    1. resolveAndDownload下载UDF jar包到本地,目录由hive.downloaded.resources.dir指定,默认是/tmp/${session_id}_resources。

    2. 将本地UDFjar包路径加入ClassLoader的classpath,这一点比较隐蔽,是调用t.preHook函数实现。

// From SessionState

public List<String> add_resources(ResourceType t, Collection<String> values)
    throws RuntimeException {
  // By default don't convert to unix
  return add_resources(t, values, false);
}

public List<String> add_resources(ResourceType t, Collection<String> values, boolean convertToUnix)
    throws RuntimeException {
  Set<String> resourceSet = resourceMaps.getResourceSet(t);
  Map<String, Set<String>> resourcePathMap = resourceMaps.getResourcePathMap(t);
  Map<String, Set<String>> reverseResourcePathMap = resourceMaps.getReverseResourcePathMap(t);
  List<String> localized = new ArrayList<String>();
  try {
    for (String value : values) {
      String key;

      //get the local path of downloaded jars. 下载jar包
      List<URI> downloadedURLs = resolveAndDownload(value, convertToUnix);

      if (ResourceDownloader.isIvyUri(value)) {
        // get the key to store in map
        key = ResourceDownloader.createURI(value).getAuthority();
      } else {
        // for local file and hdfs, key and value are same.
        key = downloadedURLs.get(0).toString();
      }
      Set<String> downloadedValues = new HashSet<String>();

      for (URI uri : downloadedURLs) {
        String resourceValue = uri.toString();
        downloadedValues.add(resourceValue);
        localized.add(resourceValue);
        if (reverseResourcePathMap.containsKey(resourceValue)) {
          if (!reverseResourcePathMap.get(resourceValue).contains(key)) {
            reverseResourcePathMap.get(resourceValue).add(key);
          }
        } else {
          Set<String> addSet = new HashSet<String>();
          addSet.add(key);
          reverseResourcePathMap.put(resourceValue, addSet);

        }
      }
      resourcePathMap.put(key, downloadedValues);
    }
   // !!!这里很重要,通过调用这个preHook将UDF jar包本地路径加入ClassLoader。
    t.preHook(resourceSet, localized);

  } catch (RuntimeException e) {
    getConsole().printError(e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
    throw e;
  } catch (URISyntaxException e) {
    getConsole().printError(e.getMessage());
    throw new RuntimeException(e);
  } catch (IOException e) {
    getConsole().printError(e.getMessage());
    throw new RuntimeException(e);
  }
  getConsole().printInfo("Added resources: " + values);
  resourceSet.addAll(localized);
  return localized;
}

     ResourceType的preHook函数通过调用registerJars加jar包路径加入ClassLoader

// From SessionState

public static enum ResourceType {
  FILE,

  JAR {
    @Override
    public void preHook(Set<String> cur, List<String> s) throws IllegalArgumentException {
      super.preHook(cur, s);
      // jar包路径加入ClassLoader
      registerJars(s);
    }
    @Override
    public void postHook(Set<String> cur, List<String> s) {
      unregisterJar(s);
    }
  },
  ARCHIVE;

  public void preHook(Set<String> cur, List<String> s) throws IllegalArgumentException {
    validateFiles(s);
  }
  public void postHook(Set<String> cur, List<String> s) {
  }
};

  static void registerJars(List<String> newJars) throws IllegalArgumentException {
    LogHelper console = getConsole();
    try {
      // AddToClassPathAction执行run方法将jar包路径加入ClassPath
      AddToClassPathAction addAction = new AddToClassPathAction(
          Thread.currentThread().getContextClassLoader(), newJars);
      final ClassLoader newLoader = AccessController.doPrivileged(addAction);
      Thread.currentThread().setContextClassLoader(newLoader);
      SessionState.get().getConf().setClassLoader(newLoader);
      console.printInfo("Added " + newJars + " to class path");
    } catch (Exception e) {
      String message = "Unable to register " + newJars;
      throw new IllegalArgumentException(message, e);
    }
  }

// From AddToClassPathAction

  public UDFClassLoader run() {
    if (useExistingClassLoader()) {
      final UDFClassLoader udfClassLoader = (UDFClassLoader) parentLoader;
      for (String path : newPaths) {
        // 加入ClassLoader
        udfClassLoader.addURL(Utilities.urlFromPathString(path));
      }
      return udfClassLoader;
    } else {
      return createUDFClassLoader();
    }
  }


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。