Flink实战(六)FlinkSQL当中的函数
6、FlinkSQL当中的函数
FlinkSQL当中也内置了大量的函数,我们使用SQL主要就是在使用各种函数,这里我们来统一给大家介绍一下关于内置函数以及自定义函数
6.1、内置函数概览
Flink Table API/SQL提供了⼤量的内置函数,⼤家⾃⼰⽤到了当字典查即可:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/systemfunctions/
6.2、FlinkSQL自定义函数
除了以上各种内置的函数之外,flinkSQL当中还有自定义函数,接下来我们就来看一下FlinkSQL的自定义函数如何实现
自定义函数分类
在hive当中,有UDF,UDAF以及UDTF这几种自定义函数,但是在FlinkSQL当中简化了这几个概念,使用的是标量函数,Tabel Function以及Aggregate Function等多种方式来表示,下表标识了flinkSQL当中的各种函数
6.2.1、自定义函数调用方式
FlinkSQL自定义函数使用方式:
可以通过Call函数内联方式来调用
6.2.2、FlinkSQL自定义函数实现方式
如果需要自定义函数,首先需要继承对应的基类,例如ScalarFunction,且该类必须声明为公共、⾮抽象、全局可访问的。 因此,不允许使⽤⾮静态内部类或匿名类。必须有默认构造⽅法(因为Flink需要实例化并注册到catalog中)
必须提供公共的、有明确定义的参数的eval⽅法(可以重载,可变参数,继承)
6.2.3、初始化open以及结束时close方法
UDF基类的open、close⽅法可以被覆盖,分别⽤于⾃定义UDF初始化和清理逻辑。在open⽅法中,提供FunctionContext参数,通过它可以获取Runtime环境的各种信息:
6.3、FlinkSQL自定义函数一进一出实战UDF
通过FlinkSQL当中的自定义函数ScalarFunction来实现json解析,通过传入json的key,获取到json对应的value字段值
json数据格式内容如下
{"date_time":"2022-10-04 08:01:48","email":"kyzqcd0686@vjikq.tng","id":0,"name":"郑剃"}
{"date_time":"2022-10-04 08:06:31","email":"bvkqwbmgwi@lh80q.4ln","id":1,"name":"闾丘喜造"}
{"date_time":"2022-10-04 08:04:39","email":"axvcbj7vbo@ecyi1.4gw","id":2,"name":"王曙介"}
{"date_time":"2022-10-04 08:00:19","email":"ew1qu5sunz@caxtg.vtn","id":3,"name":"赖溯姆"}
定义json解析函数
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class JsonParseFunction extends ScalarFunction {
@Override
public void open(FunctionContext context) throws Exception {
}
public String eval(String jsonLine,String key){
JSONObject jsonObject = JSONObject.parseObject(jsonLine);
if(jsonObject.containsKey(key)){
return jsonObject.getString(key);
}else{
return "";
}
}
@Override
public void close() throws Exception {
}
}
代码实现如下:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
public class FlinkSQLScalarFunction {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
//.inStreamingMode()//默认就是StreamingMode
.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.createTemporarySystemFunction("JsonParse",JsonParseFunction.class);
String source_sql = "CREATE TABLE json_table (\n" +
" line STRING \n" +
") WITH (\n" +
" 'connector'='filesystem',\n" +
" 'path'='input/userbase.json',\n" +
" 'format'='raw'\n" +
")";
tableEnvironment.executeSql(source_sql);
tableEnvironment.sqlQuery("select * from json_table").execute().print();
tableEnvironment.sqlQuery("select JsonParse(line,'date_time'),JsonParse(line,'email'),JsonParse(line,'id'),JsonParse(line,'name') from json_table")
.execute().print();
}
}
6.4、FlinkSQL自定义函数一进多出实战UDTF
我们也可以自定义函数,实现一条数据进入之后,产出多条数据,类似于爆炸函数一样的作用
Table functions(表函数)⼀进多出(炸裂),继承TableFunction,提供⽆返回值的eval⽅法,使⽤collect来输出。
Table functions的返回值是⼀个表,需要跟原来的表join才能得到最终结果,因此要⽤到侧写表(不明⽩的可
以研究下LATERAL TABLE)
现有json数据内容如下:其中userBaseList是一个array数组,里面存放了多个用户信息,使用UDTF自定义函数,将每个用户信息给解析出来
{"date_time":1665145907806,"price":258.7,"productId":920956185,"userBaseList":[{"begin_time":"2022-10-07 08:38:31","email":"njvjeuchpe@mk1t0.d4e","id":"0","name":"尹修彻"},{"begin_time":"2022-10-07 08:33:59","email":"qurkb119uo@fvyg5.kqj","id":"1","name":"萧幅括"},{"begin_time":"2022-10-07 08:37:40","email":"i4w8ecponz@bpoay.3yv","id":"2","name":"胡乘"},{"begin_time":"2022-10-07 08:38:05","email":"uwl7fpfwbb@b7riu.fh3","id":"3","name":"黄煎"},{"begin_time":"2022-10-07 08:37:12","email":"bjjqrvajih@c75ur.lhs","id":"4","name":"袁肇"}]}
{"date_time":1665145918652,"price":258.7,"productId":-786075263,"userBaseList":[{"begin_time":"2022-10-07 08:39:47","email":"yfynwlektk@sz0me.hys","id":"0","name":"程痢"},{"begin_time":"2022-10-07 08:32:03","email":"jwpwuiwdnc@esxbd.hta","id":"1","name":"程盐殃"},{"begin_time":"2022-10-07 08:40:17","email":"fbfnidktqg@zaxxw.g1w","id":"2","name":"蔡锻"},{"begin_time":"2022-10-07 08:35:24","email":"twrm30opcb@5rgzj.sow","id":"3","name":"李猎甩"},{"begin_time":"2022-10-07 08:33:05","email":"rnkabnvaz9@bt319.xlk","id":"4","name":"夏焙匈"}]}
{"date_time":1665145927285,"price":258.7,"productId":-988723330,"userBaseList":[{"begin_time":"2022-10-07 08:37:04","email":"pcs8ejgibk@kxf95.djq","id":"0","name":"郝疯框"},{"begin_time":"2022-10-07 08:40:20","email":"n63k5twind@eddbg.aui","id":"1","name":"万侨"},{"begin_time":"2022-10-07 08:33:52","email":"1xmk0vh3bb@1htg2.tw2","id":"2","name":"侯临迸"},{"begin_time":"2022-10-07 08:33:05","email":"cnrqk4crpy@svhkq.wwf","id":"3","name":"闾丘耘"},{"begin_time":"2022-10-07 08:34:26","email":"ubozcxmrxc@c6qpp.8ug","id":"4","name":"皇甫坡"}]}
代码实现如下
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class FlinkSQLTableFunction {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
//.inStreamingMode()//默认就是StreamingMode
.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.createTemporarySystemFunction("JsonFunc",JsonFunction.class);
tableEnvironment.createTemporarySystemFunction("explodeFunc",ExplodeFunc.class);
String source_sql = "CREATE TABLE json_table (\n" +
" line STRING \n" +
") WITH (\n" +
" 'connector'='filesystem',\n" +
" 'path'='input/product_user.json',\n" +
" 'format'='raw'\n" +
")";
tableEnvironment.executeSql(source_sql);
//方式一:使用TableAPI通过内连接来实现
tableEnvironment.from("json_table")
.joinLateral(call(ExplodeFunc.class,$("line"),"userBaseList")
.as("id","name","begin_time","email"))
.select(call(JsonFunction.class,$("line"),"date_time"),
call(JsonFunction.class,$("line"),"price"),
call(JsonFunction.class,$("line"),"productId"),
$("id"),
$("name"),
$("begin_time"),
$("email")
).execute().print();
//方式二:使用TableAPI通过左外连接来实现
tableEnvironment.from("json_table")
.leftOuterJoinLateral(call(ExplodeFunc.class,$("line"),"userBaseList")
.as("id","name","begin_time","email"))
.select(call(JsonFunction.class,$("line"),"date_time"),
call(JsonFunction.class,$("line"),"price"),
call(JsonFunction.class,$("line"),"productId"),
$("id"),
$("name"),
$("begin_time"),
$("email")
).execute().print();
//方式三:使用FlinkSQL内连接来实现
tableEnvironment.sqlQuery("select " +
"JsonFunc(line,'date_time')," +
"JsonFunc(line,'price')," +
"JsonFunc(line,'productId')," +
"id," +
"name," +
"begin_time " +
"email " +
" from json_table " +
",lateral table(explodeFunc(line,'userBaseList')) "
).execute().print();
//方式四:使用FlinkSQL左外连接来实现
tableEnvironment.sqlQuery("select " +
"JsonFunc(line,'date_time') as date_time," +
"JsonFunc(line,'price') as price ," +
"JsonFunc(line,'productId') as productId," +
"id," +
"name," +
"begin_time " +
"email " +
" from json_table left join lateral table (explodeFunc(line,'userBaseList')) as sc(id,name,begin_time,email) on true"
).execute().print();
}
/**
* 自定义udf
*/
public static class JsonFunction extends ScalarFunction {
public String eval(String line,String key){
//转换为JSON
JSONObject baseJson = new JSONObject(line);
String value = "";
if(baseJson.has(key)){
//根据key获取value
return baseJson.getString(key);
}
return value;
}
}
/**
* 自定义UDTF
*/
@FunctionHint(output = @DataTypeHint("ROW<id String,name String,begin_time String,email String>"))
public static class ExplodeFunc extends TableFunction {
public void eval(String line,String key){
JSONObject jsonObject = new JSONObject(line);
JSONArray jsonArray = new JSONArray(jsonObject.getString(key));
for(int i = 0;i< jsonArray.length();i++){
String date_time = jsonArray.getJSONObject(i).getString("begin_time");
String email = jsonArray.getJSONObject(i).getString("email");
String id = jsonArray.getJSONObject(i).getString("id");
String name = jsonArray.getJSONObject(i).getString("name");
collect(Row.of(id,name,date_time,email));
}
}
}
}
6.5、FlinkSQL自定义函数实现多进一出UDAF
Aggregate functions(聚合函数)将多⾏的标量值映射到新的标量值(多进⼀出),聚合函数⽤到了累加器,下图是聚合过程:
Aggregate functions(聚合函数)将多⾏的标量值映射到新的标量值(多进⼀出),聚合函数⽤到了累加器,下图是聚合过程:
继承AggregateFunction
必须覆盖createAccumulator和getValue
提供accumulate⽅法
retract⽅法在OVER windows上才是必须的
merge有界聚合以及会话窗⼝和滑动窗⼝聚合都需要(对性能优化也有好处)
需求:使用自定义UDAF函数来求每门课程的平均分数
现有CSV数据内容如下
1,zhangsan,Chinese,80
1,zhangsan,Math,76
1,zhangsan,Science,84
1,zhangsan,Art,90
2,lisi,Chinese,60
2,lisi,Math,78
2,lisi,Science,86
2,lisi,Art,88
代码实现如下
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
public class FlinkUdafAggregrate {
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.useBlinkPlanner()//Flink1.14开始就删除了其他的执行器了,只保留了BlinkPlanner,默认就是
//.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
//注册函数
tableEnvironment.createTemporarySystemFunction("AvgFunc",AvgFunc.class);
String source_sql = "CREATE TABLE source_score (\n" +
" id int,\n" +
" name STRING,\n" +
" course STRING,\n" +
" score Double" +
") WITH ( \n " +
" 'connector' = 'filesystem',\n" +
" 'path' = 'input/score.csv' , \n" +
" 'format' = 'csv'\n" +
")";
//创建表
tableEnvironment.executeSql(source_sql);
tableEnvironment.from("source_score")
.groupBy($("course"))
.select($("course"),call("AvgFunc",$("score").as("avg_score")))
.execute().print();
tableEnvironment.executeSql("select course,AvgFunc(score) as avg_score from source_score group by course")
.print();
}
public static class AvgFunc extends AggregateFunction<Double,AvgAccumulator> {
@Override
public Double getValue(AvgAccumulator avgAccumulator) {
if(avgAccumulator.count==0){
return null;
}else {
return avgAccumulator.sum/avgAccumulator.count;
}
}
//初始化累加器
@Override
public AvgAccumulator createAccumulator() {
return new AvgAccumulator();
}
//迭代累加
public void accumulate(AvgAccumulator acc,Double score){
acc.setSum(acc.sum+score);
acc.setCount(acc.count+1);
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class AvgAccumulator {
public double sum = 0.0;
public int count = 0;
}
}
- 点赞
- 收藏
- 关注作者
评论(0)