Flink实战(六)FlinkSQL当中的函数

举报
Maynor学长 发表于 2022/10/27 15:25:57 2022/10/27
【摘要】 6、FlinkSQL当中的函数FlinkSQL当中也内置了大量的函数,我们使用SQL主要就是在使用各种函数,这里我们来统一给大家介绍一下关于内置函数以及自定义函数 6.1、内置函数概览Flink Table API/SQL提供了⼤量的内置函数,⼤家⾃⼰⽤到了当字典查即可:https://nightlies.apache.org/flink/flink-docs-release-1.15/d...

6、FlinkSQL当中的函数

FlinkSQL当中也内置了大量的函数,我们使用SQL主要就是在使用各种函数,这里我们来统一给大家介绍一下关于内置函数以及自定义函数

6.1、内置函数概览

Flink Table API/SQL提供了⼤量的内置函数,⼤家⾃⼰⽤到了当字典查即可:

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/systemfunctions/

image-20221007193821986

6.2、FlinkSQL自定义函数

除了以上各种内置的函数之外,flinkSQL当中还有自定义函数,接下来我们就来看一下FlinkSQL的自定义函数如何实现

自定义函数分类

在hive当中,有UDF,UDAF以及UDTF这几种自定义函数,但是在FlinkSQL当中简化了这几个概念,使用的是标量函数,Tabel Function以及Aggregate Function等多种方式来表示,下表标识了flinkSQL当中的各种函数

image-20221007194003983

6.2.1、自定义函数调用方式

FlinkSQL自定义函数使用方式:

可以通过Call函数内联方式来调用

image-20221007194620178

6.2.2、FlinkSQL自定义函数实现方式

如果需要自定义函数,首先需要继承对应的基类,例如ScalarFunction,且该类必须声明为公共、⾮抽象、全局可访问的。 因此,不允许使⽤⾮静态内部类或匿名类。必须有默认构造⽅法(因为Flink需要实例化并注册到catalog中)

必须提供公共的、有明确定义的参数的eval⽅法(可以重载,可变参数,继承)

image-20221007195727620

6.2.3、初始化open以及结束时close方法

UDF基类的open、close⽅法可以被覆盖,分别⽤于⾃定义UDF初始化和清理逻辑。在open⽅法中,提供FunctionContext参数,通过它可以获取Runtime环境的各种信息:

image-20221007195905831

6.3、FlinkSQL自定义函数一进一出实战UDF

通过FlinkSQL当中的自定义函数ScalarFunction来实现json解析,通过传入json的key,获取到json对应的value字段值

json数据格式内容如下

{"date_time":"2022-10-04 08:01:48","email":"kyzqcd0686@vjikq.tng","id":0,"name":"郑剃"}
{"date_time":"2022-10-04 08:06:31","email":"bvkqwbmgwi@lh80q.4ln","id":1,"name":"闾丘喜造"}
{"date_time":"2022-10-04 08:04:39","email":"axvcbj7vbo@ecyi1.4gw","id":2,"name":"王曙介"}
{"date_time":"2022-10-04 08:00:19","email":"ew1qu5sunz@caxtg.vtn","id":3,"name":"赖溯姆"}

定义json解析函数


import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class JsonParseFunction extends ScalarFunction {

    @Override
    public void open(FunctionContext context) throws Exception {

    }


    public String eval(String jsonLine,String key){
        JSONObject jsonObject = JSONObject.parseObject(jsonLine);
        if(jsonObject.containsKey(key)){
          return   jsonObject.getString(key);
        }else{
            return "";
        }

    }



    @Override
    public void close() throws Exception {

    }
}

代码实现如下:


import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class FlinkSQLScalarFunction {

    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
                //.inStreamingMode()//默认就是StreamingMode
                .inBatchMode()
                .build();

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        tableEnvironment.createTemporarySystemFunction("JsonParse",JsonParseFunction.class);


        String source_sql = "CREATE TABLE json_table (\n" +
                "  line STRING \n" +
                ") WITH (\n" +
                "  'connector'='filesystem',\n" +
                "  'path'='input/userbase.json',\n" +
                "  'format'='raw'\n" +
                ")";

        tableEnvironment.executeSql(source_sql);
        tableEnvironment.sqlQuery("select * from json_table").execute().print();

        tableEnvironment.sqlQuery("select JsonParse(line,'date_time'),JsonParse(line,'email'),JsonParse(line,'id'),JsonParse(line,'name') from json_table")
                .execute().print();

    }

}

6.4、FlinkSQL自定义函数一进多出实战UDTF

我们也可以自定义函数,实现一条数据进入之后,产出多条数据,类似于爆炸函数一样的作用

Table functions(表函数)⼀进多出(炸裂),继承TableFunction,提供⽆返回值的eval⽅法,使⽤collect来输出。

Table functions的返回值是⼀个表,需要跟原来的表join才能得到最终结果,因此要⽤到侧写表(不明⽩的可

以研究下LATERAL TABLE)

现有json数据内容如下:其中userBaseList是一个array数组,里面存放了多个用户信息,使用UDTF自定义函数,将每个用户信息给解析出来

{"date_time":1665145907806,"price":258.7,"productId":920956185,"userBaseList":[{"begin_time":"2022-10-07 08:38:31","email":"njvjeuchpe@mk1t0.d4e","id":"0","name":"尹修彻"},{"begin_time":"2022-10-07 08:33:59","email":"qurkb119uo@fvyg5.kqj","id":"1","name":"萧幅括"},{"begin_time":"2022-10-07 08:37:40","email":"i4w8ecponz@bpoay.3yv","id":"2","name":"胡乘"},{"begin_time":"2022-10-07 08:38:05","email":"uwl7fpfwbb@b7riu.fh3","id":"3","name":"黄煎"},{"begin_time":"2022-10-07 08:37:12","email":"bjjqrvajih@c75ur.lhs","id":"4","name":"袁肇"}]}
{"date_time":1665145918652,"price":258.7,"productId":-786075263,"userBaseList":[{"begin_time":"2022-10-07 08:39:47","email":"yfynwlektk@sz0me.hys","id":"0","name":"程痢"},{"begin_time":"2022-10-07 08:32:03","email":"jwpwuiwdnc@esxbd.hta","id":"1","name":"程盐殃"},{"begin_time":"2022-10-07 08:40:17","email":"fbfnidktqg@zaxxw.g1w","id":"2","name":"蔡锻"},{"begin_time":"2022-10-07 08:35:24","email":"twrm30opcb@5rgzj.sow","id":"3","name":"李猎甩"},{"begin_time":"2022-10-07 08:33:05","email":"rnkabnvaz9@bt319.xlk","id":"4","name":"夏焙匈"}]}
{"date_time":1665145927285,"price":258.7,"productId":-988723330,"userBaseList":[{"begin_time":"2022-10-07 08:37:04","email":"pcs8ejgibk@kxf95.djq","id":"0","name":"郝疯框"},{"begin_time":"2022-10-07 08:40:20","email":"n63k5twind@eddbg.aui","id":"1","name":"万侨"},{"begin_time":"2022-10-07 08:33:52","email":"1xmk0vh3bb@1htg2.tw2","id":"2","name":"侯临迸"},{"begin_time":"2022-10-07 08:33:05","email":"cnrqk4crpy@svhkq.wwf","id":"3","name":"闾丘耘"},{"begin_time":"2022-10-07 08:34:26","email":"ubozcxmrxc@c6qpp.8ug","id":"4","name":"皇甫坡"}]}

代码实现如下


import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class FlinkSQLTableFunction {

    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
                //.inStreamingMode()//默认就是StreamingMode
                .inBatchMode()
                .build();

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        tableEnvironment.createTemporarySystemFunction("JsonFunc",JsonFunction.class);
        tableEnvironment.createTemporarySystemFunction("explodeFunc",ExplodeFunc.class);


        String source_sql = "CREATE TABLE json_table (\n" +
                "  line STRING \n" +
                ") WITH (\n" +
                "  'connector'='filesystem',\n" +
                "  'path'='input/product_user.json',\n" +
                "  'format'='raw'\n" +
                ")";

        tableEnvironment.executeSql(source_sql);

        //方式一:使用TableAPI通过内连接来实现
        tableEnvironment.from("json_table")
                .joinLateral(call(ExplodeFunc.class,$("line"),"userBaseList")
                        .as("id","name","begin_time","email"))
                        .select(call(JsonFunction.class,$("line"),"date_time"),
                                call(JsonFunction.class,$("line"),"price"),
                                call(JsonFunction.class,$("line"),"productId"),
                                $("id"),
                                $("name"),
                                $("begin_time"),
                                $("email")
                        ).execute().print();

        //方式二:使用TableAPI通过左外连接来实现
        tableEnvironment.from("json_table")
                .leftOuterJoinLateral(call(ExplodeFunc.class,$("line"),"userBaseList")
                        .as("id","name","begin_time","email"))
                .select(call(JsonFunction.class,$("line"),"date_time"),
                        call(JsonFunction.class,$("line"),"price"),
                        call(JsonFunction.class,$("line"),"productId"),
                        $("id"),
                        $("name"),
                        $("begin_time"),
                        $("email")
                ).execute().print();







        //方式三:使用FlinkSQL内连接来实现
        tableEnvironment.sqlQuery("select " +
                        "JsonFunc(line,'date_time')," +
                        "JsonFunc(line,'price')," +
                        "JsonFunc(line,'productId')," +
                        "id," +
                        "name," +
                        "begin_time " +
                         "email " +
                        "  from json_table " +
                        ",lateral table(explodeFunc(line,'userBaseList')) "
        ).execute().print();



        //方式四:使用FlinkSQL左外连接来实现
        tableEnvironment.sqlQuery("select " +
                "JsonFunc(line,'date_time') as date_time," +
                "JsonFunc(line,'price') as price ," +
                "JsonFunc(line,'productId') as productId," +
                "id," +
                "name," +
                "begin_time " +
                "email " +
                "  from json_table  left join lateral table (explodeFunc(line,'userBaseList')) as sc(id,name,begin_time,email) on true"
        ).execute().print();

    }


    /**
     * 自定义udf
     */
    public static class JsonFunction extends ScalarFunction {
        public String eval(String line,String key){
            //转换为JSON
            JSONObject baseJson = new JSONObject(line);
            String value = "";
            if(baseJson.has(key)){
                //根据key获取value
                return baseJson.getString(key);
            }
            return value;
        }
    }


    /**
     * 自定义UDTF
     */
    @FunctionHint(output = @DataTypeHint("ROW<id String,name String,begin_time String,email String>"))
    public static class ExplodeFunc  extends TableFunction {

        public void eval(String line,String key){

            JSONObject jsonObject = new JSONObject(line);
            JSONArray jsonArray = new JSONArray(jsonObject.getString(key));
            for(int i = 0;i< jsonArray.length();i++){
                String date_time = jsonArray.getJSONObject(i).getString("begin_time");
                String email = jsonArray.getJSONObject(i).getString("email");
                String id = jsonArray.getJSONObject(i).getString("id");
                String name = jsonArray.getJSONObject(i).getString("name");
                collect(Row.of(id,name,date_time,email));
            }
        }
    }
}

6.5、FlinkSQL自定义函数实现多进一出UDAF

Aggregate functions(聚合函数)将多⾏的标量值映射到新的标量值(多进⼀出),聚合函数⽤到了累加器,下图是聚合过程:

image-20221007211943672

Aggregate functions(聚合函数)将多⾏的标量值映射到新的标量值(多进⼀出),聚合函数⽤到了累加器,下图是聚合过程:

继承AggregateFunction

必须覆盖createAccumulator和getValue

提供accumulate⽅法

retract⽅法在OVER windows上才是必须的

merge有界聚合以及会话窗⼝和滑动窗⼝聚合都需要(对性能优化也有好处)

需求:使用自定义UDAF函数来求每门课程的平均分数

现有CSV数据内容如下

1,zhangsan,Chinese,80
1,zhangsan,Math,76
1,zhangsan,Science,84
1,zhangsan,Art,90
2,lisi,Chinese,60
2,lisi,Math,78
2,lisi,Science,86
2,lisi,Art,88

代码实现如下


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class FlinkUdafAggregrate {
    public static void main(String[] args) {
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
                //.inStreamingMode()//默认就是StreamingMode
                //.inBatchMode()
                .build();

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);

        //注册函数
        tableEnvironment.createTemporarySystemFunction("AvgFunc",AvgFunc.class);


        String source_sql = "CREATE TABLE source_score (\n" +
                "  id int,\n" +
                "  name STRING,\n" +
                "  course STRING,\n" +
                "  score Double" +
                ") WITH ( \n " +
                " 'connector' = 'filesystem',\n" +
                " 'path' = 'input/score.csv' , \n" +
                " 'format' = 'csv'\n" +
                ")";

        //创建表
        tableEnvironment.executeSql(source_sql);

        tableEnvironment.from("source_score")
                .groupBy($("course"))
                .select($("course"),call("AvgFunc",$("score").as("avg_score")))
                .execute().print();



        tableEnvironment.executeSql("select course,AvgFunc(score) as avg_score  from source_score group by course")
                .print();

    }

    public static  class AvgFunc  extends AggregateFunction<Double,AvgAccumulator> {
        @Override
        public Double getValue(AvgAccumulator avgAccumulator) {
            if(avgAccumulator.count==0){
                return null;
            }else {
                return avgAccumulator.sum/avgAccumulator.count;
            }
        }
        //初始化累加器
        @Override
        public AvgAccumulator createAccumulator() {
            return new AvgAccumulator();
        }
        //迭代累加
        public void accumulate(AvgAccumulator acc,Double score){
            acc.setSum(acc.sum+score);
            acc.setCount(acc.count+1);
        }
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static  class AvgAccumulator {

        public double sum = 0.0;
        public int count = 0;

    }
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。