Hive UDF UDTF UDAF 自定义函数详解

举报
Byyyi耀 发表于 2024/05/06 11:04:48 2024/05/06
【摘要】 Hive笔记05 – Hive UDF UDTF UDAF UDF UDF在Hive中的实现 UDF的创建与配置 类名定义规则示例:com.ybg.hive.ql.func.udf.UDFDateDiffByUnit规则:反向域名+模块名+功能分类(ql.func.udf:hive查询语言中的UDF函数)+具体功能|类名 基本配置New Project - Maven模板<properti...

Hive笔记05 – Hive UDF UDTF UDAF

UDF

UDF在Hive中的实现

image.png

UDF的创建与配置

类名定义规则
示例:com.ybg.hive.ql.func.udf.UDFDateDiffByUnit
规则:反向域名+模块名+功能分类(ql.func.udf:hive查询语言中的UDF函数)+具体功能|类名
基本配置
New Project - Maven模板
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>3.1.2</version>
    </dependency>
</dependencies>
UDF核心

1.参数类型和参数值分开管理
2.将共性的校验写在接口中

UDF示例:

主类:UDFDateDiffByUnit extends GenericUDF implements UDFCom,DateCom
目的:计算两个日期之间的差异,可以按年、季、月、周或日计算。
方法:
initialize:参数验证并定义UDF的返回类型

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    validateArgs(arguments,3);
    validateAllPrimitiveArgs(arguments,3);
    return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}

evaluate: 具体的数据校验
核心计算方法,计算两个日期之间的差异。

@Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        validateArgs(arguments,3);
        final String strDateSmall = arguments[0].get().toString();
        final String strDateBig = arguments[1].get().toString();
        validateDateFormat(strDateSmall,strDateBig);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        Calendar dateSmall = Calendar.getInstance();
        Calendar dateBig = Calendar.getInstance();
        try {
            dateSmall.setTime(sdf.parse(strDateSmall));
            dateBig.setTime(sdf.parse(strDateBig));
        } catch (ParseException e) {
            throw new HiveException(e);
        }
        if(dateSmall.after(dateBig)){
            throw new HiveException("dateSmall by arg1 > dateBig by arg2");
        }
        final String unit = arguments[2].get().toString().toLowerCase();
        int intUnit = 0;
        switch(unit){
            case "y":
                intUnit = Calendar.YEAR;
                break;
            case "q": case "m":
                intUnit = Calendar.MONTH;
                break;
            case "w": case "d":
                intUnit = Calendar.DATE;
                break;
            default:
                throw new HiveException("Unsupported unit by arg3 :"+unit);
        }
        int diff = -1;
        while(true){
            diff++;
            dateSmall.add(intUnit,1);
            if(dateSmall.after(dateBig)){
                break;
            }
        }
        switch(unit){
            case "q":
                diff/=3;
                break;
            case "w":
                diff/=7;
                break;
        }
        return diff;
    }

getDisplayString:提供函数及其参数的描述(并不重要)

@Override
public String getDisplayString(String[] children) {
    return Objects.isNull(children) || children.length == 0 || null == children[0] ? null : children[0];
}

eg:对于"两数相加的UDF"

@Override
public String getDisplayString(String[] children){
	if(children == null || children.length<2){
		// 重述函数用法
		return "Usage:MyAddFunction(int,int)";	
	}
	// 查询解释
	return "MyAddFunction(" + children[0] + ", " + children[1] + ")";
}

接口一:DateCom
功能:提供日期相关的辅助方法。
validateDateFormat:验证日期格式是否符合 yyyy-MM-dd。

default void validateDateFormat(String...dateStrArr) throws HiveException {
    for (String dateStr : dateStrArr) {
        // dateStr.matches("\\d{4}-(0?[1-9]|1[0-2])-(0?[1-9]|[1-2][0-9]|3[0-1])")
        if (!dateStr.matches("\\d{4}-(0?[1-9]|1[0-2])-(0?[1-9]|[1-2][0-9]|3[0-1])")) {
            throw new HiveException("date format illegal : " + dateStr);
        }
    }
}

接口二:UDFCom
功能:提供通用的参数验证方法。
validateArgs:检查传入的参数数量是否正确,以及是否有空参数。
validateAllPrimitiveArgs:确保所有参数都是原始类型。
原始类型指的是最基本的数据类型,直接包含了数据的值。(byte short int long float double char boolean string✔)

/**
 * @param args  实际参数数组
 * @param size  预期参数个数
 * @throws HiveException
 */
// 实现非具体的通用校验
default void validateArgs(Object[] args,int size) throws UDFArgumentException {
    if (size>0 && (Objects.isNull(args) || args.length < size)) {
        // 检验提供的参数数量是否满足size个
        throw new UDFArgumentException(size+" args must be provided.");
    }
    for (int i = 0; i < size; i++) {
        // 检测某一参数是否为空
        if (Objects.isNull(args[i])) {
            throw new UDFArgumentException("type of args["+i+"] null");
        }
    }
}

// 验证参数的类型是否为原始类型
default void validateAllPrimitiveArgs(Object[] args, int size) throws UDFArgumentException{
    for (int i = 0; i < size; i++) {
        if (((ObjectInspector)args[i]).getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("only support primitive type");
        }
    }
}

打jar包上传至 HDFS 的两种方式:
– install => 打资源jar包,直接将jar包打入到 maven localRepository
– package => 打执行jar包,直接将jar包打入到 project target ✔

Hive UDF集成到Hive查询环境的四步骤:

  1. 打包(Package)
    package打架包
  2. 找包(Locate Package)
    架包位于target处,show in explorer显示物理路径
  3. 上传(Upload)
    复制架包路径并上传到HDFS上。
  4. 创建Hive UDF映射至HDFS上的JAR文件,并且指定了UDF实现的完整类名。
create function FUNC_NAME as 'com.ybg.hive.ql.func.udf.UDFDateDiffByUnit'(主类的全包路径)
using jar 'hdfs://single01:9000/hive_data/udf/hiveudf2-1.0-SNAPSHOT.jar';(hdfs://single01:9000+HDFS中架包存放路径)

问题与解决方法:

如果架包删除后重新上传会出现"UDF按照前一个架包方式继续运行"的情况
解决方式是:close project之后重新打开project(重新连接)。

UDTF

UDTF的创建与配置

与UDF相同

UDTF示例

package com.ybg.hive.ql.func.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class MyExplode extends GenericUDTF {
    private static Logger logger = LoggerFactory.getLogger(MyExplode.class);
    private ObjectInspector oi;
    private Object[] params;

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
        oi = argOIs[0];
        final ObjectInspector.Category category = oi.getCategory();
        // names 和 types分别存储数据的名称和类型
        List<String> names = new ArrayList<>(2);
        List<ObjectInspector> types = new ArrayList<>(2);
        switch (category){
            // 默认的名字
            case MAP:
                logger.info("receive explode category : Map");
                names.add("key");
                names.add("value");
                final MapObjectInspector moi = (MapObjectInspector) this.oi;
                types.add(moi.getMapKeyObjectInspector());
                types.add(moi.getMapValueObjectInspector());
                params = new Object[2];
                break;
            case LIST:
                logger.info("receive explode category : List");
                names.add("value");
                final ListObjectInspector loi = (ListObjectInspector) oi;
                types.add(loi.getListElementObjectInspector());
                params = new Object[1];
                break;
            default:
                throw new UDFArgumentException("not supported category for function explode : " + category);
        }
        return ObjectInspectorFactory.getStandardStructObjectInspector(names,types);
    }

    @Override
    public void process(Object[] args) throws HiveException {
        if (args.length != 1 || Objects.isNull(args[0])){
            throw new HiveException("Only 1 nonnull arg supported for function explode, but got " + args.length);
        }
        ObjectInspector.Category category = oi.getCategory();
        switch(category){
            case MAP:
                final Map<?, ?> map = ((MapObjectInspector) oi).getMap(args[0]);
			 // map.entrySet().forEach(entry -> {
			 //     params[0] = entry.getKey();
			 //     params[1] = entry.getValue();
			 //     try {
			 //         forward(params);
			 //     } catch (HiveException e) {
			 //         throw new RuntimeException(e);
			 //     }
			 // });
                final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
                while(it.hasNext()){
                    final Map.Entry<?, ?> entry = it.next();
                    params[0] = entry.getKey();
                    params[1] = entry.getValue();
                    forward(params);
                }
                break;
            case LIST:
                final List<?> list = ((ListObjectInspector) oi).getList(args[0]);
                final Iterator<?> itl = list.iterator();
                while (itl.hasNext()) {
                    params[0] = itl.next();
                    forward(params);
                }
                break;
        }
    }

    @Override
    public void close() throws HiveException {
        oi = null;
        /**
         * 将数组置空
         * 1.数组中存储的是对象的引用。将数组中每个元素设置为null,断开数组和这些对象的链接。使这些对象没有引用指向它们,便于垃圾回收。
         * 2.将数组引用本身置空,告诉垃圾回收器数组本身可以被回收。
         */
        for (int i = 0; i < params.length; i++) {
            params[i] = null;
        }
        params = null;
    }
}

代码注意点

  1. public class MyExplode extends GenericUDTF 继承GenericUDTF抽象类之后,会自动重构两个方法process()close(),但是,我们需要手动重构另一个方法initialize(ObjectInspector[] argOIs)(ps:只能用这个过期方法才能处理结构化数据类型,用initialize(StructObjectInspector[] argOIs))无法实现。

  2. 根据处理异常的方式选择循环方式

// 1.发生异常后继续执行
//  map.entrySet().forEach(entry -> {
//      params[0] = entry.getKey();
//      params[1] = entry.getValue();
//      try {
//          forward(params);
//      } catch (HiveException e) {
//          ...
//      }
//  });
// 2.发生异常后终止while循环
  final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
  while(it.hasNext()){
      final Map.Entry<?, ?> entry = it.next();
      params[0] = entry.getKey();
      params[1] = entry.getValue();
      forward(params);
  }
  break;

注释部分的lambda表达式中foreach期望一个consumer接口,而该接口不允许抛出检查型异常,只能尝试在lambda内捕获异常并进行处理。
而改为iterator迭代则可以选择抛出异常。

  1. 将数组置空
    a.数组中存储的是对象的引用。将数组中每个元素设置为null,断开数组和这些对象的链接。使这些对象没有引用指向它们,便于垃圾回收。
    b.将数组引用本身置空,告诉垃圾回收器数组本身可以被回收。

UDAF

UDAF Mode

		PARTITIAL1
			-- 对原始数据进行部分聚合
			-- iterate() & teriminatePartitial() 会被调用
			-- Mapper
		
		PARTITIAL2
			-- 将部分聚合进行聚合
			-- merge() & teriminatePartitial() 会被调用
			-- Combiner
		
		FINAL
			-- 将所有的部分聚合进行完全聚合
			-- merge() & terminate() 会被调用
			-- Reducer
		
		COMPLETE
			-- 直接对原始数据进行全量聚合
			-- iterate() & terminate() 会被调用
			-- Mapper -> Reducer

代码

package cn.ybg.hive.ql.func.udaf;

import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class MySum extends AbstractGenericUDAFResolver {
    private static Logger logger = LoggerFactory.getLogger(MySum.class);

    // 检查参数类型是否非空且长度为1(是否传入参数都是同一类型)
    private static void checkParam(String content, Object...params) throws SemanticException{
        if(Objects.isNull(params) || params.length!=1 || Objects.isNull(params[0])){
            throw new SemanticException(content);
        }
    }

    // getEvaluator():根据输入参数的类型,选择并返回合适的UDAF计算器
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        // 检查参数info是否非空
        if(Objects.isNull(info)){
            throw new SemanticException("From YB12211 : MySum getEvaluator(info) : info NullPointerException");
        }
        // 获取参数的类型信息
        ObjectInspector[] params = info.getParameterObjectInspectors();

        // 子类向上类型转换:自动转换
        checkParam("From YB12211 : MySum getEvaluator(info) : only support one nonnull param",params);

        // 验证参数类型的种类是否为基本数据类型
        ObjectInspector.Category category = params[0].getCategory();
        if(category != ObjectInspector.Category.PRIMITIVE){
            throw new SemanticException("From YB12211 : MySum getEvaluator(info) : only support primitive type");
        }

        PrimitiveObjectInspector inputIO = (PrimitiveObjectInspector) params[0];
        AbstractSumEvaluator evaluator;
        // 根据参数类型选择相应的计算器
        switch (inputIO.getPrimitiveCategory()){
            case BYTE: case SHORT: case INT: case LONG:
                evaluator = new SumLong();
                break;
            case FLOAT: case DOUBLE:
                evaluator = new SumDouble();
                break;
            case DECIMAL:
                evaluator = new SumDecimal();
                break;
            default:
                throw new SemanticException("From YB12211 : MySum getEvaluator(info) : doesn't support type of "
                        +inputIO.getPrimitiveCategory());
        }

        // 根据参数设置计算器的”是否开窗“和"是否去重"
        evaluator.setWindowing(info.isWindowing());
        evaluator.setDistinct(info.isDistinct());
        return evaluator;
    }

    // 去除私有
    // AbstractSumEvaluator:实现UDAF中SUM函数的通用逻辑
    // T表示SUM的结果数据类型,通常是 DoubleWritable、LongWritable 或 HiveDecimalWritable。
    static abstract class AbstractSumEvaluator<T extends Writable> extends GenericUDAFEvaluator{
        // AbstractSumAgg是SUM函数的聚合缓冲区
        // E表示SUM的中间结果类型
        abstract class AbstractSumAgg<E> extends AbstractAggregationBuffer{
            // 标识聚合缓冲区是否为空
            boolean empty;
            E agg;
            // 类型差异,不一定是 E
            // 如果使用DISTINCT关键字进行聚合计算,会用它检测唯一性
            Set<Object> unique;

            // 去构造器
            /*public AbstractSumAgg() {
                reset();
            }*/

            public boolean isEmpty() {
                return empty;
            }

            // 添加类型差异
            boolean add(Object parameter){
                if(empty){
                    empty = false;
                }
                // 类型转换
                if (isWindowingAndDistinct()) {
                    // 将参数值parameter转化为java对象obj,便于后续进行唯一性检查
                    Object obj = parameter instanceof ObjectInspectorObject ?
                            (ObjectInspectorObject) parameter :
                            ObjectInspectorUtils.copyToStandardJavaObject(parameter,inputIO);
                    if(unique.contains(obj)){
                        return false;
                    }else{ // 忘了半个逻辑
                        unique.add(obj);
                    }
                }
                return true;
            }

            // 重置聚合缓冲区的状态,将`empty`置为`true`,并清空`unique`集合(如果使用DISTINCT关键字的话)
            void reset(){
                empty = true;
                if (isWindowingAndDistinct()) {
                    if(Objects.nonNull(unique)){
                        if(!unique.isEmpty()){
                            unique.clear();
                        }
                    }else{
                        unique = new HashSet<>();
                    }
                }
            }
        }

        // 属性迁移
        PrimitiveObjectInspector inputIO;
        PrimitiveObjectInspector outputIO;

        boolean isWindowing;
        boolean isDistinct;

        // 去除构造器
        /*public AbstractSumEvaluator(boolean isWindowing, boolean isDistinct) {
            this.isWindowing = isWindowing;
            this.isDistinct = isDistinct;
        }*/

        // 新增 setter
        void setWindowing(boolean windowing) {
            isWindowing = windowing;
        }

        void setDistinct(boolean distinct) {
            isDistinct = distinct;
        }

        // willInit 放回
        void checkParamsAndInit(ObjectInspector[] params,Mode mode, boolean willInit, String content)
                throws HiveException {
            checkParam(content,params);
            // 初始化UDAF计算器的模式和参数
            super.init(mode, params);
            // 根据需要初始化输入输出类型
            if (willInit) {
                inputIO = (PrimitiveObjectInspector) params[0];
                // 将输入的ObjectInspector转化为输出的标准Java ObjectInspector
                outputIO = (PrimitiveObjectInspector) ObjectInspectorUtils
                        .getStandardObjectInspector(inputIO, ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA);
            }
        }

        boolean isWindowingAndDistinct(){
            return isWindowing && isDistinct;
        }

        // terminatePartial 方法用于计算部分聚合结果,但如果启用了DISTINCT属性,则会抛出异常,因为DISTINCT不支持部分聚合。
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            if (isWindowingAndDistinct()) {
                throw new HiveException("From YB12211 : distinct sum doesn't support terminatePartial(AggregationBuffer agg)");
            }
            return terminate(agg);
        }
    }

    static class SumDecimal extends AbstractSumEvaluator<HiveDecimalWritable>{
        class SumAggDecimal extends AbstractSumAgg<HiveDecimalWritable>{
            // 调用reset()将其重置为初始状态
            public SumAggDecimal() {
                // 变动
                reset();
            }

            // 父类负责对`待加数`进行校验,子类负责实现真正的添加
            @Override
            boolean add(Object parameter) {
                HiveDecimal value = PrimitiveObjectInspectorUtils.getHiveDecimal(parameter, inputIO);
                if (super.add(value)) {
                    agg.mutateAdd(value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
                    agg = new HiveDecimalWritable(HiveDecimal.ZERO);
                }else{
                    agg.set(HiveDecimal.ZERO);
                }
            }
        }

        // 用于初始化计算器的状态
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            checkParamsAndInit(parameters,m,false,"From YB12211 : SumDecimal.init(Mode m, ObjectInspector[] parameters) parameters can't be NULL");

            // 考虑到取值范围:将长度放大
            inputIO = (PrimitiveObjectInspector) parameters[0];

            int precision = inputIO.precision();
            int scale = inputIO.scale();
            switch (m){
                // 在部分聚合(PARTIAL1)和最终聚合(COMPLETE)阶段,SUM 函数需要对输入的 DECIMAL 类型数据进行累加,并且可能会产生更大精度的结果。为了确保计算不会丢失精度,需要在这些阶段增加精度。
                case PARTIAL1: case COMPLETE:
                    precision = Math.min(precision+10, HiveDecimal.MAX_PRECISION);
                    break;
            }
            DecimalTypeInfo decimalTypeInfo = TypeInfoFactory.getDecimalTypeInfo(precision, scale);
            /**
             * PrimitiveObjectInspector
             *      AbstractPrimitiveObjectInspector
             *          AbstractPrimitiveWritableObjectInspector
             */
            outputIO = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(decimalTypeInfo);
            outputIO = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(outputIO);
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new SumAggDecimal();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((SumAggDecimal)agg).reset();
        }

        /*private HiveDecimalWritable toHiveDecimalWritable(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumDecimal.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new HiveDecimalWritable(PrimitiveObjectInspectorUtils.getHiveDecimal(value, inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            //((SumAggDecimal)agg).add(toHiveDecimalWritable(parameters[0]));
            checkParam("From YB12211 : SumDecimal.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggDecimal)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            //((SumAggDecimal)agg).add(toHiveDecimalWritable(partial));
            checkParam("From YB12211 : SumDecimal.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggDecimal)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            SumAggDecimal sumAgg = (SumAggDecimal) agg;
            if(sumAgg.isEmpty()){
                return null;
            }
            return sumAgg.agg;
        }
    }

    static class SumDouble extends AbstractSumEvaluator<DoubleWritable>{
        class SumAggDouble extends AbstractSumAgg<DoubleWritable>{
            public SumAggDouble() {
                // 变动
                reset();
            }

            @Override
            boolean add(Object parameter) {
                double value = PrimitiveObjectInspectorUtils.getDouble(parameter, inputIO);
                if (super.add(value)) {
                    agg.set(agg.get()+value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
                    agg = new DoubleWritable(0.0);
                }else{
                    agg.set(0.0);
                }
            }
        }

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            checkParamsAndInit(parameters,m,true,"From YB12211 : SumDouble.init(Mode m, ObjectInspector[] parameters) parameters can't be null");
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new SumAggDouble();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((SumAggDouble)agg).reset();
        }

        /*private DoubleWritable toDouble(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumDouble.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new DoubleWritable(PrimitiveObjectInspectorUtils.getDouble(value[0], inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            //((SumAggDouble)agg).add(toDouble(parameters[0]));
            checkParam("From YB12211 : SumDouble.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggDouble)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            //((SumAggDouble)agg).add(toDouble(partial));
            checkParam("From YB12211 : SumDouble.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggDouble)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            SumAggDouble sumAgg = (SumAggDouble) agg;
            if(sumAgg.isEmpty()){
                return null;
            }
            return sumAgg.agg;
        }
    }

    static class SumLong extends AbstractSumEvaluator<LongWritable>{
        class SumAggLong extends AbstractSumAgg<LongWritable>{
            public SumAggLong() {
                // 变动
                reset();
            }

            @Override
            boolean add(Object parameter) {
                long value = PrimitiveObjectInspectorUtils.getLong(parameter, inputIO);
                if (super.add(value)) {
                    agg.set(agg.get()+value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
                    agg = new LongWritable(0);
                }else{
                    agg.set(0);
                }
            }
        }

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            checkParamsAndInit(parameters,m,true,"From YB12211 : SumLong.init(Mode m, ObjectInspector[] parameters) parameters can't be null");
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new SumAggLong();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((SumAggLong)agg).reset();
        }

        /*private LongWritable toLong(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumLong.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new LongWritable(PrimitiveObjectInspectorUtils.getLong(value[0], inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            //((SumAggLong)agg).add(toLong(parameters[0]));
            checkParam("From YB12211 : SumLong.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggLong)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            //((SumAggLong)agg).add(toLong(partial));
            checkParam("From YB12211 : SumLong.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggLong)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            SumAggLong sumAgg = (SumAggLong) agg;
            if(sumAgg.isEmpty()){
                return null;
            }
            return sumAgg.agg;
        }
    }
}

UDAF的创建与配置

同上

UDAF流程

  1. 创建UDAF的Resolver类(继承于AbstractGenericUDAFResolver)
  2. 在Resolver类中实现getEvaluator方法:返回合适类型的UDAF计算器
  3. 创建计算器类(Evaluator类,通常继承于GenericUDAFEvaluator)
    Evaluator类中实现具体的UDAF函数逻辑和其他和新方法(reset,interate,merge…)
  4. 创建Evaluator类的具体子类

UDF UDTF UDAF的区别

UDF:一进一出
特殊的多进一出(进的部分并列,如:concat)
UDTF:一进多出(列转行,如:explode)
UDAF:多进一出

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。