[进阶篇]如何编写Hive UDAF技术指导
介绍
hive的UDAF(user-defined aggregate function)用户自定义聚合函数,可以允许用户自行开发某种数据处理逻辑的函数,将其注册在Hive中进行调用完成业务逻辑,UDAF可以接受多个输入数据行,并产生一个输出数据行,比如像我们日常使用的count、sum、max等函数都是UDAF的Hive内置函数。
如何自己编写UDAF
开发一个UDAF需要继承Hive的类有两种方式:
第一种方式:继承org.apache.hadoop.hive.ql.exec.UDAF,此种继承方式用于实现简单的UDAF,某些复杂场景不易处理,社区推荐使用第二种方式。
第二种方式:实现接口org.apache.hadoop.hive.ql.udf.generic. GenericUDAFResolver2或者实现接口org.apache.hadoop.hive.ql.udf.generic. GenericUDAFResolver,推荐实现org.apache.hadoop.hive.ql.udf.generic. GenericUDAFResolver2,此方式实现的UDAF较为通用。
接口实现
实现接口GenericUDAFResolver2的类是UDAF的入口类,实现此接口需要实现两个名为getEvaluator的方法,此两项方法是一个含义,其中getEvaluator(TypeInfo[] parameters)方法仅为了向后兼容而保留。getEvaluator返回抽象GenericUDAFEvaluator类或者此类的子类,GenericUDAFEvaluator是完成UDAF各个逻辑阶段的必要接口定义类,getEvaluator需要返回一个继承了GenericUDAFEvaluator的自定义实现类,用于满足具体业务逻辑需要。
继承GenericUDAFEvaluator,即继承抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator,子类必须实现一些抽象方法来实现UDAF的逻辑。由于Hive的UDF都是现有引擎逻辑的阶段开发的,无论MR、Tez还是Spark引擎,都分有mapper、reducer、combiner等阶段,因此首先介绍一下GenericUDAFEvaluator中的一个内联静态枚举类Mode,Mode代表了MR的各个阶段:
public static enum Mode {
/**
* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
* 将会调用iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
* 将会调用merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
* 将会调用merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
* 将会调用 iterate()和terminate()
*/
COMPLETE
};
如果有一些任务只有mapper,而没有reducer,那么就会只有COMPLETE阶段,这个阶段直接输入原始数据,输出结果
介绍一下需要实现的GenericUDAFEvaluator的必要方法:
// 确定各个阶段输入输出参数的数据格式ObjectInspectors
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
// 保存数据聚集结果的类
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
// 重置聚集结果
public void reset(AggregationBuffer agg) throws HiveException;
// map阶段,迭代处理输入sql传过来的列数据
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
// map与combiner结束返回结果,得到部分数据聚集结果
public Object terminatePartial(AggregationBuffer agg) throws HiveException;
// combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
// reducer阶段,输出最终结果
public Object terminate(AggregationBuffer agg) throws HiveException;
具体实战的实例代码可参看社区源码实现:https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
UDAF函数描述的编写
如果用户希望自己开发的函数在注册之后,能够将使用方式示例描述信息从Hive侧查出来,比如:
在Hive中执行:desc function count;
结果显示UDAF的使用示例和描述:
要实现此项说明显示,需要在实现GenericUDAFResolver2接口的主类前加上注解Description,例如:
- 点赞
- 收藏
- 关注作者
评论(0)