[进阶篇]如何编写Hive UDAF技术指导

举报
蓝极光 发表于 2021/08/02 21:24:47 2021/08/02
【摘要】 介绍hive的UDAF(user-defined aggregate function)用户自定义聚合函数,可以允许用户自行开发某种数据处理逻辑的函数,将其注册在Hive中进行调用完成业务逻辑,UDAF可以接受多个输入数据行,并产生一个输出数据行,比如像我们日常使用的count、sum、max等函数都是UDAF的Hive内置函数。如何自己编写UDAF开发一个UDAF需要继承Hive的类有两种...

介绍

hive的UDAFuser-defined aggregate function)用户自定义聚合函数,可以允许用户自行开发某种数据处理逻辑的函数,将其注册在Hive中进行调用完成业务逻辑,UDAF可以接受多个输入数据行,并产生一个输出数据行,比如像我们日常使用的countsummax等函数都是UDAFHive内置函数。

如何自己编写UDAF

开发一个UDAF需要继承Hive的类有两种方式:

第一种方式:继承org.apache.hadoop.hive.ql.exec.UDAF,此种继承方式用于实现简单的UDAF,某些复杂场景不易处理,社区推荐使用第二种方式。

第二种方式:实现接口org.apache.hadoop.hive.ql.udf.generic. GenericUDAFResolver2或者实现接口org.apache.hadoop.hive.ql.udf.generic. GenericUDAFResolver,推荐实现org.apache.hadoop.hive.ql.udf.generic. GenericUDAFResolver2,此方式实现的UDAF较为通用。

接口实现

实现接口GenericUDAFResolver2的类是UDAF的入口类,实现此接口需要实现两个名为getEvaluator的方法,此两项方法是一个含义,其中getEvaluator(TypeInfo[] parameters)方法仅为了向后兼容而保留。getEvaluator返回抽象GenericUDAFEvaluator类或者此类的子类,GenericUDAFEvaluator是完成UDAF各个逻辑阶段的必要接口定义类,getEvaluator需要返回一个继承了GenericUDAFEvaluator的自定义实现类,用于满足具体业务逻辑需要。

继承GenericUDAFEvaluator,即继承抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator,子类必须实现一些抽象方法来实现UDAF的逻辑。由于HiveUDF都是现有引擎逻辑的阶段开发的,无论MRTez还是Spark引擎,都分有mapperreducercombiner等阶段,因此首先介绍一下GenericUDAFEvaluator中的一个内联静态枚举类ModeMode代表了MR的各个阶段:

public static enum Mode {
    /**
     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
     * 将会调用iterate()和terminatePartial()
     */
    PARTIAL1,

    /**
     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
     * 将会调用merge() 和 terminatePartial()
    */
    PARTIAL2,

    /**
     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
     * 将会调用merge()和terminate()
     */
    FINAL,

    /**
     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
     * 将会调用 iterate()和terminate()
     */
    COMPLETE
  };

如果有一些任务只有mapper,而没有reducer,那么就会只有COMPLETE阶段,这个阶段直接输入原始数据,输出结果

介绍一下需要实现的GenericUDAFEvaluator的必要方法:

// 确定各个阶段输入输出参数的数据格式ObjectInspectors
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;

// 保存数据聚集结果的类
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

// 重置聚集结果
public void reset(AggregationBuffer agg) throws HiveException;

// map阶段,迭代处理输入sql传过来的列数据
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

// map与combiner结束返回结果,得到部分数据聚集结果
public Object terminatePartial(AggregationBuffer agg) throws HiveException;

// combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
public void merge(AggregationBuffer agg, Object partial) throws HiveException;

// reducer阶段,输出最终结果
public Object terminate(AggregationBuffer agg) throws HiveException;

具体实战的实例代码可参看社区源码实现:https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java

UDAF函数描述的编写

如果用户希望自己开发的函数在注册之后,能够将使用方式示例描述信息从Hive侧查出来,比如:

Hive中执行:desc function count;

结果显示UDAF的使用示例和描述:

要实现此项说明显示,需要在实现GenericUDAFResolver2接口的主类前加上注解Description,例如:

 

 

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。