Hive UDF源码解析【1】Create Function
1 简介
本文主要介绍Hive Create Function的主要流程,之后将解析Show Functions、Drop Function、Reload Function、UDF的调用流程等UDF相关源码。
CREATE FUNCTION [db_name.]function_name AS class_name
[USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
1.1 简单的用例
// MyAddUDF实现了简单的加法
create function testadd as 'com.test.udf.MyAddUDF'
using
jar 'hdfs://localhost:9000/udf/MyTestUDF-1.0.jar',
jar 'hdfs://localhost:9000/udf/UDFDep-1.0.jar'; // MyTestUDF的依赖
2 AST解析
using jar 语句被解析为TOK_RESOURCE_LIST,
TOK_CREATEFUNCTION
testadd
'com.test.udf.MyAddUDF'
TOK_RESOURCE_LIST
TOK_RESOURCE_URI
TOK_JAR
'hdfs://localhost:9000/udf/MyTestUDF-1.0.jar'
TOK_RESOURCE_URI
TOK_JAR
'hdfs://localhost:9000/udf/UDFDep-1.0.jar'
3 DDL解析(FunctionSemanticAnalyzer)
通过SemanticAnalyzerFactory.get可以知道,关于FUNCTION的DDL都是由FunctionSemanticAnalyzer处理。
// from SemanticAnalyzerFactory
public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree) {
......
switch (tree.getType()) {
......
case HiveParser.TOK_CREATEFUNCTION:
case HiveParser.TOK_DROPFUNCTION:
case HiveParser.TOK_RELOADFUNCTION:
return new FunctionSemanticAnalyzer(queryState);
......
}
}
SemanticAnalyzerFactory通过AST获取functionName、className、resourceList保存到CreateFunctionDesc中,并创建FunctionTask加入到rootTasks中。
// from FunctionSemanticAnalyzer
public void analyzeInternal(ASTNode ast) throws SemanticException {
if (ast.getType() == HiveParser.TOK_CREATEFUNCTION) {
analyzeCreateFunction(ast);
} else if (ast.getType() == HiveParser.TOK_DROPFUNCTION) {
analyzeDropFunction(ast);
} else if (ast.getType() == HiveParser.TOK_RELOADFUNCTION) {
rootTasks.add(TaskFactory.get(new FunctionWork(new ReloadFunctionDesc()), conf));
}
}
private void analyzeCreateFunction(ASTNode ast) throws SemanticException {
String functionName = ast.getChild(0).getText().toLowerCase(); // 获取UDF名
boolean isTemporaryFunction = (ast.getFirstChildWithType(HiveParser.TOK_TEMPORARY) != null);
String className = unescapeSQLString(ast.getChild(1).getText()); // 获取类路径名
......
// 从AST获取Resource list
List<ResourceUri> resources = getResourceList(ast);
// 执行Task时会使用到CreateFunctionDesc保存的信息
CreateFunctionDesc desc = new CreateFunctionDesc(functionName, isTemporaryFunction, className, resources);
// 由FunctionWork获取FunctionTask
rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf));
......
}
TaskFactory通过遍历taskvec获取对应的Task子类,FunctionWork对应FunctionTask。
// from TaskFactory
public static <T extends Serializable> Task<T> get(T work, HiveConf conf,
Task<? extends Serializable>... tasklist) {
Task<T> ret = get((Class<T>) work.getClass(), conf);
......
}
public static <T extends Serializable> Task<T> get(Class<T> workClass,
HiveConf conf) {
// taskvec存储了 workClass - taskClass的对应关系
for (TaskTuple<? extends Serializable> t : taskvec) {
if (t.workClass == workClass) {
try {
Task<T> ret = (Task<T>) t.taskClass.newInstance();
ret.setId("Stage-" + Integer.toString(getAndIncrementId()));
return ret;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
......
}
public static ArrayList<TaskTuple<? extends Serializable>> taskvec;
static {
......
taskvec.add(new TaskTuple<FunctionWork>(FunctionWork.class, FunctionTask.class));
......
}
4 DDL语句执行(FunctionTask)
// from FunctionTask
public int execute(DriverContext driverContext) {
CreateFunctionDesc createFunctionDesc = work.getCreateFunctionDesc();
if (createFunctionDesc != null) {
if (createFunctionDesc.isTemp()) {
// 如果是temporary function
return createTemporaryFunction(createFunctionDesc);
} else {
try {
// 否则添加永久函数(即会将函数信息加入metastore)
return createPermanentFunction(Hive.get(conf), createFunctionDesc);
} catch (Exception e) {
setException(e);
LOG.error(stringifyException(e));
return 1;
}
}
}
......
}
4.1 FunctionRegistry
这里注意checkLocalFunctionResources:当metastore是local模式时,resource uri才可以使用本地文件系统,否则只能使用如hdfs等文件系统。接下来是两个重要的步骤:
1. 注册UDF:FunctionRegistry.registerPermanentFunction。这里有两个关键点:
1) 将根据resource URL将jar包UDF下载到本地,并将本地jar包URL加入当前Session的UDFClassLoader的classpath;
2) FunctionRegistry的静态常量Registry system里注册UDF信息。
2. UDF信息写入metastore
// from FunctionTask
private int createPermanentFunction(Hive db, CreateFunctionDesc createFunctionDesc)
throws HiveException, IOException {
// 如果functionName中不带dbName,即testdb.testfunc,则dbName为当前session的DB
String[] qualifiedNameParts = FunctionUtils.getQualifiedFunctionNameParts(
createFunctionDesc.getFunctionName());
String dbName = qualifiedNameParts[0];
String funcName = qualifiedNameParts[1];
String registeredName = FunctionUtils.qualifyFunctionName(funcName, dbName);
String className = createFunctionDesc.getClassName();
// resouceList, 即udf jar包路径
List<ResourceUri> resources = createFunctionDesc.getResources();
// For permanent functions, check for any resources from local filesystem.
checkLocalFunctionResources(db, createFunctionDesc.getResources());
FunctionInfo registered = null;
try {
// 注册UDF
registered = FunctionRegistry.registerPermanentFunction(
registeredName, className, true, toFunctionResource(resources));
} catch (RuntimeException ex) {
Throwable t = ex;
while (t.getCause() != null) {
t = t.getCause();
}
}
if (registered == null) {
console.printError("Failed to register " + registeredName
+ " using class " + createFunctionDesc.getClassName());
return 1;
}
// Add to metastore. UDF信息加入metastore
Function func = new Function(
funcName,
dbName,
className,
SessionState.get().getUserName(),
PrincipalType.USER,
(int) (System.currentTimeMillis() / 1000),
org.apache.hadoop.hive.metastore.api.FunctionType.JAVA,
resources
);
db.createFunction(func);
return 0;
}
FunctionRegistry调用Registry system的registerPermanentFunction函数,Registry system是静态常量,里面保存了所有UDF的信息。
// from FunctionRegistry
public static FunctionInfo registerPermanentFunction(String functionName,
String className, boolean registerToSession, FunctionResource[] resources) {
return system.registerPermanentFunction(functionName, className, registerToSession, resources);
}
4.2 Registry
Registry的registerPermanentFunction函数首先执行会registerToSessionRegistry。
// from Registry
public FunctionInfo registerPermanentFunction(String functionName,
String className, boolean registerToSession, FunctionResource... resources) {
FunctionInfo function = new FunctionInfo(functionName, className, resources);
// register to session first for backward compatibility
if (registerToSession) {
String qualifiedName = FunctionUtils.qualifyFunctionName(
functionName, SessionState.get().getCurrentDatabase().toLowerCase());
if (registerToSessionRegistry(qualifiedName, function) != null) {
addFunction(functionName, function);
return function;
}
} else {
addFunction(functionName, function);
}
return null;
}
registerToSessionRegistry有两个主要功能:
1. 执行FunctionTask.addFunctionResources(resources),作用是下载resources所指的jar包到本地目录,同时将本地的jar包路径的classpath加入ClassLoader中。
2. 执行SessionState.getRegistryForWrite().registerFunction,作用是将UDF信息加入当前Session的Registry system(注意区别FunctionRegistry的Registry system)。
// from Registry
// should be called after session registry is checked
private FunctionInfo registerToSessionRegistry(String qualifiedName, FunctionInfo function) {
FunctionInfo ret = null;
// 或许当前Session的ClassLoader
ClassLoader prev = Utilities.getSessionSpecifiedClassLoader();
try {
// Found UDF in metastore - now add it to the function registry
// At this point we should add any relevant jars that would be needed for the UDf.
FunctionResource[] resources = function.getResources();
try {
FunctionTask.addFunctionResources(resources);
} catch (Exception e) {
LOG.error("Unable to load resources for " + qualifiedName + ":" + e, e);
return null;
}
ClassLoader loader = Utilities.getSessionSpecifiedClassLoader();
Class<?> udfClass = Class.forName(function.getClassName(), true, loader);
// Make sure the FunctionInfo is listed as PERSISTENT (rather than TEMPORARY)
// when it is registered to the system registry.
ret = SessionState.getRegistryForWrite().registerFunction(
qualifiedName, FunctionType.PERSISTENT, udfClass, resources);
if (ret == null) {
LOG.error(function.getClassName() + " is not a valid UDF class and was not registered.");
}
if (SessionState.get().isHiveServerQuery()) {
SessionState.getRegistryForWrite().addToUDFLoaders(loader);
}
} catch (ClassNotFoundException e) {
// Lookup of UDf class failed
LOG.error("Unable to load UDF class: " + e);
Utilities.restoreSessionSpecifiedClassLoader(prev);
}
function.shareStateWith(ret);
return ret;
}
addFunctionResources主要调用SessionState的add_resources。
// From FunctionTask
public static void addFunctionResources(FunctionResource[] resources) throws HiveException {
if (resources != null) {
Multimap<SessionState.ResourceType, String> mappings = HashMultimap.create();
for (FunctionResource res : resources) {
mappings.put(res.getResourceType(), res.getResourceURI());
}
for (SessionState.ResourceType type : mappings.keys()) {
SessionState.get().add_resources(type, mappings.get(type));
}
}
}
4.3 SessionState
SessionState的add_resources两个要点:
1. resolveAndDownload下载UDF jar包到本地,目录由hive.downloaded.resources.dir指定,默认是/tmp/${session_id}_resources。
2. 将本地UDFjar包路径加入ClassLoader的classpath,这一点比较隐蔽,是调用t.preHook函数实现。
// From SessionState
public List<String> add_resources(ResourceType t, Collection<String> values)
throws RuntimeException {
// By default don't convert to unix
return add_resources(t, values, false);
}
public List<String> add_resources(ResourceType t, Collection<String> values, boolean convertToUnix)
throws RuntimeException {
Set<String> resourceSet = resourceMaps.getResourceSet(t);
Map<String, Set<String>> resourcePathMap = resourceMaps.getResourcePathMap(t);
Map<String, Set<String>> reverseResourcePathMap = resourceMaps.getReverseResourcePathMap(t);
List<String> localized = new ArrayList<String>();
try {
for (String value : values) {
String key;
//get the local path of downloaded jars. 下载jar包
List<URI> downloadedURLs = resolveAndDownload(value, convertToUnix);
if (ResourceDownloader.isIvyUri(value)) {
// get the key to store in map
key = ResourceDownloader.createURI(value).getAuthority();
} else {
// for local file and hdfs, key and value are same.
key = downloadedURLs.get(0).toString();
}
Set<String> downloadedValues = new HashSet<String>();
for (URI uri : downloadedURLs) {
String resourceValue = uri.toString();
downloadedValues.add(resourceValue);
localized.add(resourceValue);
if (reverseResourcePathMap.containsKey(resourceValue)) {
if (!reverseResourcePathMap.get(resourceValue).contains(key)) {
reverseResourcePathMap.get(resourceValue).add(key);
}
} else {
Set<String> addSet = new HashSet<String>();
addSet.add(key);
reverseResourcePathMap.put(resourceValue, addSet);
}
}
resourcePathMap.put(key, downloadedValues);
}
// !!!这里很重要,通过调用这个preHook将UDF jar包本地路径加入ClassLoader。
t.preHook(resourceSet, localized);
} catch (RuntimeException e) {
getConsole().printError(e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
throw e;
} catch (URISyntaxException e) {
getConsole().printError(e.getMessage());
throw new RuntimeException(e);
} catch (IOException e) {
getConsole().printError(e.getMessage());
throw new RuntimeException(e);
}
getConsole().printInfo("Added resources: " + values);
resourceSet.addAll(localized);
return localized;
}
ResourceType的preHook函数通过调用registerJars加jar包路径加入ClassLoader
// From SessionState
public static enum ResourceType {
FILE,
JAR {
@Override
public void preHook(Set<String> cur, List<String> s) throws IllegalArgumentException {
super.preHook(cur, s);
// jar包路径加入ClassLoader
registerJars(s);
}
@Override
public void postHook(Set<String> cur, List<String> s) {
unregisterJar(s);
}
},
ARCHIVE;
public void preHook(Set<String> cur, List<String> s) throws IllegalArgumentException {
validateFiles(s);
}
public void postHook(Set<String> cur, List<String> s) {
}
};
static void registerJars(List<String> newJars) throws IllegalArgumentException {
LogHelper console = getConsole();
try {
// AddToClassPathAction执行run方法将jar包路径加入ClassPath
AddToClassPathAction addAction = new AddToClassPathAction(
Thread.currentThread().getContextClassLoader(), newJars);
final ClassLoader newLoader = AccessController.doPrivileged(addAction);
Thread.currentThread().setContextClassLoader(newLoader);
SessionState.get().getConf().setClassLoader(newLoader);
console.printInfo("Added " + newJars + " to class path");
} catch (Exception e) {
String message = "Unable to register " + newJars;
throw new IllegalArgumentException(message, e);
}
}
// From AddToClassPathAction
public UDFClassLoader run() {
if (useExistingClassLoader()) {
final UDFClassLoader udfClassLoader = (UDFClassLoader) parentLoader;
for (String path : newPaths) {
// 加入ClassLoader
udfClassLoader.addURL(Utilities.urlFromPathString(path));
}
return udfClassLoader;
} else {
return createUDFClassLoader();
}
}
- 点赞
- 收藏
- 关注作者
评论(0)