Flink实战(五)Flink当中的窗口操作

举报
Maynor学长 发表于 2022/10/27 15:22:41 2022/10/27
【摘要】 5、Flink当中的窗口操作有了时间属性,我们就可以配合窗⼝来完成各种业务的计算,Flink Table API/SQL提供了丰富的窗⼝操作。Flink DataStream已经支持Group WindowsFlink Table API/SQL还支持Over Windows 5.1 window概念streaming 流式计算是一种被设计用于处理无限数据集的处理引擎,无限数据集是指一种随...

5、Flink当中的窗口操作

有了时间属性,我们就可以配合窗⼝来完成各种业务的计算,Flink Table API/SQL提供了丰富的窗⼝操作。

Flink DataStream已经支持Group Windows

Flink Table API/SQL还支持Over Windows

5.1 window概念

  • streaming 流式计算是一种被设计用于处理无限数据集的处理引擎,无限数据集是指一种随时间不断增长的数据,而 window 是一种切割无限数据为有限块进行处理的手段。
  • window 就是将无界流切割成有界流的一种方式,它会将流分发到有限大小的桶(bucket)中进行分析。

image-20201225115317076

5.2 window的类型

  • Window 可以分成两类:
    • TimeWindow
      • 计时窗口: 按照一定时间生成 Window(比如:每10秒)
    • CountWindow
      • 计数窗口:按照指定的数据量生成一个 Window,与时间无关(比如:每100个元素)

1584599129746

  • 窗口类型汇总:

1587473202406

5.3 TimeWindow 分类

  • 对于 TimeWindow,可以根据窗口实现原理的不同分成三类:
    • 滚动窗口(TumblingWindow)
    • 滑动窗口(Sliding Window)
    • 会话窗口(Session Window)

5.3.1 滚动窗口(Tumbling Windows)

  • ==概念==

    • 将数据依据固定的窗口长度对数据进行切片
  • ==特点==

    • 时间对齐,窗口长度固定,没有重叠
  • ==例如==

    • 如果你指定了一个 5 分钟大小的滚动窗口,窗口的创建如下图所示

8afff55d837f455e85228de277d3499f

  • ==适用场景==

    • 适合做 BI 统计等(做每个时间段的聚合计算)

5.3.2 滑动窗口(Sliding Windows)

  • ==概念==

    • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
  • ==特点==

    • 时间对齐,窗口长度固定,可以有重叠
  • ==例如==

    • 你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示

      1587473504462

    f3bc83dac4a9471dbec1aca4f8606e99

  • ==适用场景==

    • 对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)

5.3.3 会话窗口(Session Windows)

  • ==概念==

    • 由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
  • ==特点==

    • 窗口大小是由数据本身决定,它没有固定的开始和结束时间。
    • 会话窗口根据Session gap间隙切分不同的窗口,当一个窗口在大于Session gap间隙的时间内没有接收到新数据时,窗口将关闭
  • ==例如==

    • 设置的时间gap是6秒,那么,当相邻的记录相差>=6秒时,则触发窗口

      1587473534498

    44654a356735430d84ab7170a7ebecf2

  • ==适用场景==

    • 每个用户在一个独立的session中平均页面访问时长, session 和 session 的间隔时间是15分钟 。

5.3.4、group windows

所谓Group Windows (分组窗口),就是把event按照时间或者计数 分成若干个组,然后在对每个组执行窗口函数,Group Window 从键控来说,分为键控window 和 非键控window,按照窗口事件分配逻辑又分为若干类型。

键控分window和非键控window

按照是否先根据指定的键(字段/属性)分组再基于时间/计数构建的Window,可以把Group Window分为【键控 Window】【⾮键控Window】

有时把键控窗⼝跟翻滚窗⼝/滑动窗⼝搞混淆,其实是事物的不同层⾯:

概念 定义
键控window 先根据指定的键(字段/属性)分组,再基于时间/计数构建的Window(双重分组)
⾮键控Window 不根据指定的键(字段/属性)分组,直接基于时间/计数构建的Window

注意:窗⼝也是⼀种分组

d60542ad0c104ef9abe124f3da4ae63e

5.3.5、按照event分配逻辑分类

时间窗⼝:根据时间对数据流进⾏行行分组切⽚片

翻滚时间窗⼝:Tumbling Time Window

滑动时间窗⼝:Sliding Time Window

会话窗⼝:Session Window

计数窗⼝口:根据元素个数对数据流进⾏行行分组切⽚片

翻滚计数窗:Tumbling CountWindow

滑动计数窗:Sliding CountWindow

注意:时间窗⼝口[start,end),左闭右开

5.4、flinkSQL当中窗口的使用

Flink SQL中通过Group Windows函数来定义分组窗⼝

image-20221005111355008

特别注意:FlinkSQL只⽀持基于时间的分组窗⼝,对于批处理time_attr必须是TIMESTAMP类型的

另外还有⼀些辅助函数,可以在select⼦句中⽤来查询 Group Window 的开始和结束时间戳,以及时间属性 (start,end,rowtime,proctime)。

辅助函数 说明
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) 返回窗⼝的开始时间戳(start),即窗⼝的下边界时间戳
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) 返回窗⼝的结束时间戳(end),即窗⼝的上边界之外紧挨着的不包含在 本窗⼝的时间戳注意:结束时间戳【不能】⽤作【后续】基于时间的操作中的⾏时间属 性,例如group window或者over window的聚合操作
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) 返回窗⼝的上边界时间戳,以rowtime形式返回 注意:结果是rowtime时间属性,【可以】⽤做后续基于时间的操作,例 如group window 或者 over window的聚合操作等
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval)

注意:必须使⽤与 GROUP BY ⼦句中的窗⼝函数完全相同的参数来调⽤辅助函数。

基本语法如下

CREATE TABLE Orders (
 user BIGINT,
 product STRING,
 amount INT,
 order_time TIMESTAMP(3),
 WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE
) WITH (...);
SELECT
 user,
 TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,//参数必须跟下⾯GROUP BY⼦句中的窗⼝
函数参数⼀致
 SUM(amount) FROM Orders
GROUP BY
 TUMBLE(order_time, INTERVAL '1' DAY),
 user

5.4.1、基于Event-time的滚动窗口实现

现有数据内容如下:表示不同用户购买的商品记录,以下数据都是product_id为1的商品,被不同的用户在不同的时间下单,所花费的金额如下,使用flinkSQL当中的滚动窗口来计算每隔2秒钟的金额的最大值,或者平均值,

product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80

定义javaBean对象如下

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserProduct {

    private Integer product_id;
    private String buyer_name;
    private Long date_time;
    private Double price;
}

启动socket服务,然后将以上数据写入到socket里面去,通过FlinkSQL程序实现数据的处理


import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkSQLTumbEvtWindowTime {
    public static void main(String[] args) {
        StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
        //3.读取数据
        WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
                .<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
                    @Override
                    public long extractTimestamp(UserProduct t, long l) {
                        return t.getDate_time() * 1000;
                    }
                });
        DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
                .map(event -> {
                    String[] arr = event.split(",");
                    return UserProduct.builder()
                            .product_id(Integer.parseInt(arr[0]))
                            .buyer_name(arr[1])
                            .date_time(Long.valueOf(arr[2]))
                            .price(Double.valueOf(arr[3]))
                            .build();
                }).assignTimestampsAndWatermarks(watermarkStrategy);

        //4.流转换为动态表
        Table table = tEnv.fromDataStream(userProductDataStream,
                $("product_id"),$("buyer_name"),$("price"),$("date_time").rowtime());

        //5.自定义窗口并计算
        Table resultTable = tEnv.sqlQuery("select "+
                "product_id,"+
                "max(price),"+
                "TUMBLE_START(date_time,INTERVAL '5' second) as winstart "+
                "from "+table+" GROUP BY product_id,TUMBLE(date_time,INTERVAL '5' second) ");
        //6.执行Flink
        resultTable.execute().print();
    }
}

5.4.2、基于Event-time的滑动窗口实现

现有数据内容如下:表示不同用户购买的商品记录,以下数据都是product_id为1的商品,被不同的用户在不同的时间下单,所花费的金额如下,使用flinkSQL当中的滑动窗口来计算每隔2秒钟的金额的最大值,或者平均值,

product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80

启动socket服务,然后将以上数据写入到socket里面去,通过FlinkSQL程序实现数据的处理


import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkSQLSlideWindowEvtTime {
    public static void main(String[] args) {
        StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
        //3.读取数据
        WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
                .<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
                    @Override
                    public long extractTimestamp(UserProduct t, long l) {
                        return t.getDate_time() * 1000;
                    }
                });
        DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
                .map(event -> {
                    String[] arr = event.split(",");
                    return UserProduct.builder()
                            .product_id(Integer.parseInt(arr[0]))
                            .buyer_name(arr[1])
                            .date_time(Long.valueOf(arr[2]))
                            .price(Double.valueOf(arr[3]))
                            .build();
                }).assignTimestampsAndWatermarks(watermarkStrategy);


        Schema schema = Schema.newBuilder()
                .column("product_id", "bigint")
                .column("buyer_name", "String")
                .column("date_time", "Long")
                .column("price", "double")
                .build();


        Table table = tEnv.fromDataStream(userProductDataStream,     $("product_id"),$("buyer_name"),$("price"),$("date_time").rowtime());


        Table resulTable = tEnv.sqlQuery("select product_id,max(price), HOP_START(date_time ,INTERVAL '2' second,INTERVAL '4' second ) " +
                "as winstart from " + table +
                " group by product_id, HOP(date_time, INTERVAL '2' second, INTERVAL '4' second) ");

        resulTable.execute().print();
    }
}

5.4.3、基于Event_time的会话窗口实现

现有数据内容如下:表示不同用户购买的商品记录,以下数据都是product_id为1的商品,被不同的用户在不同的时间下单,所花费的金额如下,使用flinkSQL当中的会话窗口来计算每隔2秒钟的金额的最大值,或者平均值,

product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80

启动socket服务,然后将以上数据写入到socket里面去,通过FlinkSQL程序实现数据的处理


import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkSQLSessionWindowEvtTime {
    public static void main(String[] args) {
        StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
        //3.读取数据
        WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
                .<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
                    @Override
                    public long extractTimestamp(UserProduct t, long l) {
                        return t.getDate_time() * 1000;
                    }
                });
        DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
                .map(event -> {
                    String[] arr = event.split(",");
                    return UserProduct.builder()
                            .product_id(Integer.parseInt(arr[0]))
                            .buyer_name(arr[1])
                            .date_time(Long.valueOf(arr[2]))
                            .price(Double.valueOf(arr[3]))
                            .build();
                }).assignTimestampsAndWatermarks(watermarkStrategy);

        Table table = tEnv.fromDataStream(userProductDataStream,
                $("product_id"),
                $("buyer_name"),
                $("price"),
                $("date_time").rowtime());

        Table resulTable = tEnv.sqlQuery("select product_id,max(price),SESSION_START( date_time,INTERVAL '5' second ) as winstart from " + table + " group by product_id, SESSION(date_time , INTERVAL '5' second )");
        resulTable.execute().print();
    }
}

5.4.4、Over窗口使用基于时间前置指定值

Over window 聚合是标准 SQL 中已有的(Over 子句),可以在查询的 SELECT 子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Over windows使用.window(w:overwindows*)子句定义,并在 select()方法中通过别名来引用。例子:

val table = input.window([w: OverWindow] as 'w).select('a, 'b.sum over 'w, 'c.min over 'w)

Table API 提供了 Over 类,来配置 Over 窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows。

​ 无界的 over window 是使用常量指定的。也就是说,时间间隔要指定 UNBOUNDED_RANGE,或者行计数间隔要指定 UNBOUNDED_ROW。而有界的 over window 是用间隔的大小指定的。

// 无界的事件时间
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)//无界的处理时间
over window (时间字段"proctime").window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)// 无界的事件时间 Row-count
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)//无界的处理时间 Row-count 
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

使用Over窗口按event-time排序有界向==前5s开窗==,求取最大金额以及平均金额值

数据格式如下

product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80

import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkSQLOverWinEvTimeRange {
    public static void main(String[] args) {
        StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
        //3.读取数据
        WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
                .<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
                    @Override
                    public long extractTimestamp(UserProduct t, long l) {
                        return t.getDate_time() * 1000;
                    }
                });
        DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
                .map(event -> {
                    String[] arr = event.split(",");
                    return UserProduct.builder()
                            .product_id(Integer.parseInt(arr[0]))
                            .buyer_name(arr[1])
                            .date_time(Long.valueOf(arr[2]))
                            .price(Double.valueOf(arr[3]))
                            .build();
                }).assignTimestampsAndWatermarks(watermarkStrategy);

        Table table = tEnv.fromDataStream(userProductDataStream,
                $("product_id"),
                $("buyer_name"),
                $("price"),
                $("date_time").rowtime());


        //5.自定义窗口并计算
        Table resultTable = tEnv.sqlQuery("select "+
                "product_id,"+
                "max(price) OVER w AS max_price,"+
                "avg(price) OVER w AS avg_price "+
                "from "+table+" WINDOW w AS (\n" +
                " PARTITION BY product_id\n" +
                " ORDER BY date_time\n" +
                " RANGE BETWEEN INTERVAL '5' second PRECEDING AND CURRENT ROW) \n");

        //6.执行Flink
        resultTable.execute().print();
    }
}

5.4.4、Over窗口使用基于数据条数前置

Over window 聚合是标准 SQL 中已有的(Over 子句),可以在查询的 SELECT 子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Over windows使用.window(w:overwindows*)子句定义,并在 select()方法中通过别名来引用。例子:

val table = input.window([w: OverWindow] as 'w).select('a, 'b.sum over 'w, 'c.min over 'w)

Table API 提供了 Over 类,来配置 Over 窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows。

​ 无界的 over window 是使用常量指定的。也就是说,时间间隔要指定 UNBOUNDED_RANGE,或者行计数间隔要指定 UNBOUNDED_ROW。而有界的 over window 是用间隔的大小指定的。

// 无界的事件时间
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)//无界的处理时间
over window (时间字段"proctime").window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)// 无界的事件时间 Row-count
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)//无界的处理时间 Row-count 
over window (时间字段 "rowtime").window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

使用Over窗口按event-time排序有界向==前3条数据==,求取最大金额以及平均金额值

数据格式如下

product_id,buyer_name,date_time,price
1,郑剃,1664841620,68
1,闾丘喜造,1664841622,75
1,王曙介,1664841624,84
1,赖溯姆,1664841626,56
1,钱泼奎,1664841628,74
1,尉迟亏,1664841630,35
1,贾盏,1664841632,53
1,蔡辟,1664841634,45
1,蔡矛,1664841636,38
1,赖妖炬,1664841638,89
1,毛溜孝,1664841640,45
1,邵省充,1664841642,42
1,邓瑟冕,1664841644,68
1,史符先,1664841646,66
1,钟驯,1664841648,80

import cn.flink.bean.UserProduct;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkSQLOverWinEvRowRange {
    public static void main(String[] args) {
        StreamExecutionEnvironment senv= StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
        //3.读取数据
        WatermarkStrategy<UserProduct> watermarkStrategy = WatermarkStrategy
                .<UserProduct>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<UserProduct>() {
                    @Override
                    public long extractTimestamp(UserProduct t, long l) {
                        return t.getDate_time() * 1000;
                    }
                });
        DataStream<UserProduct> userProductDataStream=senv.socketTextStream("bigdata01",9999)
                .map(event -> {
                    String[] arr = event.split(",");
                    return UserProduct.builder()
                            .product_id(Integer.parseInt(arr[0]))
                            .buyer_name(arr[1])
                            .date_time(Long.valueOf(arr[2]))
                            .price(Double.valueOf(arr[3]))
                            .build();
                }).assignTimestampsAndWatermarks(watermarkStrategy);

        Table table = tEnv.fromDataStream(userProductDataStream,
                $("product_id"),
                $("buyer_name"),
                $("price"),
                $("date_time").rowtime());


        //5.自定义窗口并计算
        Table resultTable = tEnv.sqlQuery("select "+
                "product_id,"+
                "max(price) OVER w AS max_price,"+
                "avg(price) OVER w AS avg_price "+
                "from "+table+" WINDOW w AS (\n" +
                " PARTITION BY product_id\n" +
                " ORDER BY date_time\n" +
                " ROWS BETWEEN  3 PRECEDING AND CURRENT ROW) \n");

        //6.执行Flink
        resultTable.execute().print();
    }
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。