Flink实战(五)Flink当中的窗口操作
5、Flink当中的窗口操作
有了时间属性,我们就可以配合窗⼝来完成各种业务的计算,Flink Table API/SQL提供了丰富的窗⼝操作。
Flink DataStream已经支持Group Windows
Flink Table API/SQL还支持Over Windows
5.1 window概念
- streaming 流式计算是一种被设计用于处理无限数据集的处理引擎,无限数据集是指一种随时间不断增长的数据,而 window 是一种切割无限数据为有限块进行处理的手段。
- window 就是将无界流切割成有界流的一种方式,它会将流分发到有限大小的桶(bucket)中进行分析。
5.2 window的类型
- Window 可以分成两类:
- TimeWindow
- 计时窗口: 按照一定时间生成 Window(比如:每10秒)
- CountWindow
- 计数窗口:按照指定的数据量生成一个 Window,与时间无关(比如:每100个元素)
- TimeWindow
- 窗口类型汇总:
5.3 TimeWindow 分类
- 对于 TimeWindow,可以根据窗口实现原理的不同分成三类:
- 滚动窗口(TumblingWindow)
- 滑动窗口(Sliding Window)
- 会话窗口(Session Window)
5.3.1 滚动窗口(Tumbling Windows)
-
==概念==
- 将数据依据固定的窗口长度对数据进行切片
-
==特点==
- 时间对齐,窗口长度固定,没有重叠
-
==例如==
- 如果你指定了一个 5 分钟大小的滚动窗口,窗口的创建如下图所示
-
==适用场景==
- 适合做 BI 统计等(做每个时间段的聚合计算)
5.3.2 滑动窗口(Sliding Windows)
-
==概念==
- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
-
==特点==
- 时间对齐,窗口长度固定,可以有重叠
-
==例如==
-
你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示
-
-
==适用场景==
- 对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)
5.3.3 会话窗口(Session Windows)
-
==概念==
- 由一系列事件组合一个指定时间长度的
timeout
间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
- 由一系列事件组合一个指定时间长度的
-
==特点==
- 窗口大小是由数据本身决定,它没有固定的开始和结束时间。
- 会话窗口根据Session gap间隙切分不同的窗口,当一个窗口在大于Session gap间隙的时间内没有接收到新数据时,窗口将关闭
-
==例如==
-
设置的时间gap是6秒,那么,当相邻的记录相差>=6秒时,则触发窗口
-
-
==适用场景==
- 每个用户在一个独立的session中平均页面访问时长, session 和 session 的间隔时间是15分钟 。
5.3.4、group windows
所谓Group Windows (分组窗口),就是把event按照时间或者计数 分成若干个组,然后在对每个组执行窗口函数,Group Window 从键控来说,分为键控window 和 非键控window,按照窗口事件分配逻辑又分为若干类型。
键控分window和非键控window
按照是否先根据指定的键(字段/属性)分组再基于时间/计数构建的Window,可以把Group Window分为【键控 Window】【⾮键控Window】
有时把键控窗⼝跟翻滚窗⼝/滑动窗⼝搞混淆,其实是事物的不同层⾯:
概念 | 定义 | |
---|---|---|
键控window | 先根据指定的键(字段/属性)分组,再基于时间/计数构建的Window(双重分组) | |
⾮键控Window | 不根据指定的键(字段/属性)分组,直接基于时间/计数构建的Window |
注意:窗⼝也是⼀种分组
5.3.5、按照event分配逻辑分类
时间窗⼝:根据时间对数据流进⾏行行分组切⽚片
翻滚时间窗⼝:Tumbling Time Window
滑动时间窗⼝:Sliding Time Window
会话窗⼝:Session Window
计数窗⼝口:根据元素个数对数据流进⾏行行分组切⽚片
翻滚计数窗:Tumbling CountWindow
滑动计数窗:Sliding CountWindow
注意:时间窗⼝口[start,end),左闭右开
5.4、flinkSQL当中窗口的使用
Flink SQL中通过Group Windows函数来定义分组窗⼝
特别注意:FlinkSQL只⽀持基于时间的分组窗⼝,对于批处理time_attr必须是TIMESTAMP类型的
另外还有⼀些辅助函数,可以在select⼦句中⽤来查询 Group Window 的开始和结束时间戳,以及时间属性 (start,end,rowtime,proctime)。
辅助函数 | 说明 |
---|---|
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) | 返回窗⼝的开始时间戳(start),即窗⼝的下边界时间戳 |
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) | 返回窗⼝的结束时间戳(end),即窗⼝的上边界之外紧挨着的不包含在 本窗⼝的时间戳注意:结束时间戳【不能】⽤作【后续】基于时间的操作中的⾏时间属 性,例如group window或者over window的聚合操作 |
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) | 返回窗⼝的上边界时间戳,以rowtime形式返回 注意:结果是rowtime时间属性,【可以】⽤做后续基于时间的操作,例 如group window 或者 over window的聚合操作等 |
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) |
注意:必须使⽤与 GROUP BY ⼦句中的窗⼝函数完全相同的参数来调⽤辅助函数。
基本语法如下
CREATE TABLE Orders (
user BIGINT,
product STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE
) WITH (...);
SELECT
user,
TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,//参数必须跟下⾯GROUP BY⼦句中的窗⼝
函数参数⼀致
SUM(amount) FROM Orders
GROUP BY
TUMBLE(order_time, INTERVAL '1' DAY),
user
5.4.1、基于Event-time的滚动窗口实现
现有数据内容如下:表示不同用户购买的商品记录,以下数据都是product_id为1的商品,被不同的用户在不同的时间下单,所花费的金额如下,使用flinkSQL当中的滚动窗口来计算每隔2秒钟的金额的最大值,或者平均值,
product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80
定义javaBean对象如下
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserProduct {
private Integer product_id;
private String buyer_name;
private Long date_time;
private Double price;
}
启动socket服务,然后将以上数据写入到socket里面去,通过FlinkSQL程序实现数据的处理
import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkSQLTumbEvtWindowTime {
public static void main(String[] args) {
StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.读取数据
WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
.<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
@Override
public long extractTimestamp(UserProduct t, long l) {
return t.getDate_time() * 1000;
}
});
DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
.map(event -> {
String[] arr = event.split(",");
return UserProduct.builder()
.product_id(Integer.parseInt(arr[0]))
.buyer_name(arr[1])
.date_time(Long.valueOf(arr[2]))
.price(Double.valueOf(arr[3]))
.build();
}).assignTimestampsAndWatermarks(watermarkStrategy);
//4.流转换为动态表
Table table = tEnv.fromDataStream(userProductDataStream,
$("product_id"),$("buyer_name"),$("price"),$("date_time").rowtime());
//5.自定义窗口并计算
Table resultTable = tEnv.sqlQuery("select "+
"product_id,"+
"max(price),"+
"TUMBLE_START(date_time,INTERVAL '5' second) as winstart "+
"from "+table+" GROUP BY product_id,TUMBLE(date_time,INTERVAL '5' second) ");
//6.执行Flink
resultTable.execute().print();
}
}
5.4.2、基于Event-time的滑动窗口实现
现有数据内容如下:表示不同用户购买的商品记录,以下数据都是product_id为1的商品,被不同的用户在不同的时间下单,所花费的金额如下,使用flinkSQL当中的滑动窗口来计算每隔2秒钟的金额的最大值,或者平均值,
product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80
启动socket服务,然后将以上数据写入到socket里面去,通过FlinkSQL程序实现数据的处理
import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkSQLSlideWindowEvtTime {
public static void main(String[] args) {
StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.读取数据
WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
.<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
@Override
public long extractTimestamp(UserProduct t, long l) {
return t.getDate_time() * 1000;
}
});
DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
.map(event -> {
String[] arr = event.split(",");
return UserProduct.builder()
.product_id(Integer.parseInt(arr[0]))
.buyer_name(arr[1])
.date_time(Long.valueOf(arr[2]))
.price(Double.valueOf(arr[3]))
.build();
}).assignTimestampsAndWatermarks(watermarkStrategy);
Schema schema = Schema.newBuilder()
.column("product_id", "bigint")
.column("buyer_name", "String")
.column("date_time", "Long")
.column("price", "double")
.build();
Table table = tEnv.fromDataStream(userProductDataStream, $("product_id"),$("buyer_name"),$("price"),$("date_time").rowtime());
Table resulTable = tEnv.sqlQuery("select product_id,max(price), HOP_START(date_time ,INTERVAL '2' second,INTERVAL '4' second ) " +
"as winstart from " + table +
" group by product_id, HOP(date_time, INTERVAL '2' second, INTERVAL '4' second) ");
resulTable.execute().print();
}
}
5.4.3、基于Event_time的会话窗口实现
现有数据内容如下:表示不同用户购买的商品记录,以下数据都是product_id为1的商品,被不同的用户在不同的时间下单,所花费的金额如下,使用flinkSQL当中的会话窗口来计算每隔2秒钟的金额的最大值,或者平均值,
product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80
启动socket服务,然后将以上数据写入到socket里面去,通过FlinkSQL程序实现数据的处理
import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkSQLSessionWindowEvtTime {
public static void main(String[] args) {
StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.读取数据
WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
.<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
@Override
public long extractTimestamp(UserProduct t, long l) {
return t.getDate_time() * 1000;
}
});
DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
.map(event -> {
String[] arr = event.split(",");
return UserProduct.builder()
.product_id(Integer.parseInt(arr[0]))
.buyer_name(arr[1])
.date_time(Long.valueOf(arr[2]))
.price(Double.valueOf(arr[3]))
.build();
}).assignTimestampsAndWatermarks(watermarkStrategy);
Table table = tEnv.fromDataStream(userProductDataStream,
$("product_id"),
$("buyer_name"),
$("price"),
$("date_time").rowtime());
Table resulTable = tEnv.sqlQuery("select product_id,max(price),SESSION_START( date_time,INTERVAL '5' second ) as winstart from " + table + " group by product_id, SESSION(date_time , INTERVAL '5' second )");
resulTable.execute().print();
}
}
5.4.4、Over窗口使用基于时间前置指定值
Over window 聚合是标准 SQL 中已有的(Over 子句),可以在查询的 SELECT 子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Over windows使用.window(w:overwindows*)子句定义,并在 select()方法中通过别名来引用。例子:
val table = input.window([w: OverWindow] as 'w).select('a, 'b.sum over 'w, 'c.min over 'w)
Table API 提供了 Over 类,来配置 Over 窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows。
无界的 over window 是使用常量指定的。也就是说,时间间隔要指定 UNBOUNDED_RANGE
,或者行计数间隔要指定 UNBOUNDED_ROW
。而有界的 over window 是用间隔的大小指定的。
// 无界的事件时间
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)//无界的处理时间
over window (时间字段"proctime").window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)// 无界的事件时间 Row-count
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)//无界的处理时间 Row-count
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
使用Over窗口按event-time排序有界向==前5s开窗==,求取最大金额以及平均金额值
数据格式如下
product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80
import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkSQLOverWinEvTimeRange {
public static void main(String[] args) {
StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.读取数据
WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
.<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
@Override
public long extractTimestamp(UserProduct t, long l) {
return t.getDate_time() * 1000;
}
});
DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
.map(event -> {
String[] arr = event.split(",");
return UserProduct.builder()
.product_id(Integer.parseInt(arr[0]))
.buyer_name(arr[1])
.date_time(Long.valueOf(arr[2]))
.price(Double.valueOf(arr[3]))
.build();
}).assignTimestampsAndWatermarks(watermarkStrategy);
Table table = tEnv.fromDataStream(userProductDataStream,
$("product_id"),
$("buyer_name"),
$("price"),
$("date_time").rowtime());
//5.自定义窗口并计算
Table resultTable = tEnv.sqlQuery("select "+
"product_id,"+
"max(price) OVER w AS max_price,"+
"avg(price) OVER w AS avg_price "+
"from "+table+" WINDOW w AS (\n" +
" PARTITION BY product_id\n" +
" ORDER BY date_time\n" +
" RANGE BETWEEN INTERVAL '5' second PRECEDING AND CURRENT ROW) \n");
//6.执行Flink
resultTable.execute().print();
}
}
5.4.4、Over窗口使用基于数据条数前置
Over window 聚合是标准 SQL 中已有的(Over 子句),可以在查询的 SELECT 子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Over windows使用.window(w:overwindows*)子句定义,并在 select()方法中通过别名来引用。例子:
val table = input.window([w: OverWindow] as 'w).select('a, 'b.sum over 'w, 'c.min over 'w)
Table API 提供了 Over 类,来配置 Over 窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows。
无界的 over window 是使用常量指定的。也就是说,时间间隔要指定 UNBOUNDED_RANGE
,或者行计数间隔要指定 UNBOUNDED_ROW
。而有界的 over window 是用间隔的大小指定的。
// 无界的事件时间
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)//无界的处理时间
over window (时间字段"proctime").window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)// 无界的事件时间 Row-count
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)//无界的处理时间 Row-count
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
使用Over窗口按event-time排序有界向==前3条数据==,求取最大金额以及平均金额值
数据格式如下
product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80
import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkSQLOverWinEvRowRange {
public static void main(String[] args) {
StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.读取数据
WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
.<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
@Override
public long extractTimestamp(UserProduct t, long l) {
return t.getDate_time() * 1000;
}
});
DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
.map(event -> {
String[] arr = event.split(",");
return UserProduct.builder()
.product_id(Integer.parseInt(arr[0]))
.buyer_name(arr[1])
.date_time(Long.valueOf(arr[2]))
.price(Double.valueOf(arr[3]))
.build();
}).assignTimestampsAndWatermarks(watermarkStrategy);
Table table = tEnv.fromDataStream(userProductDataStream,
$("product_id"),
$("buyer_name"),
$("price"),
$("date_time").rowtime());
//5.自定义窗口并计算
Table resultTable = tEnv.sqlQuery("select "+
"product_id,"+
"max(price) OVER w AS max_price,"+
"avg(price) OVER w AS avg_price "+
"from "+table+" WINDOW w AS (\n" +
" PARTITION BY product_id\n" +
" ORDER BY date_time\n" +
" ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) \n");
//6.执行Flink
resultTable.execute().print();
}
}
- 点赞
- 收藏
- 关注作者
评论(0)