2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口

举报
Lansonli 发表于 2021/09/28 23:50:47 2021/09/28
【摘要】 目录 案例一 基于时间的滚动和滑动窗口 需求 代码实现 案例一 基于时间的滚动和滑动窗口 需求 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红...

目录

案例一 基于时间的滚动和滑动窗口

需求

代码实现


案例一 基于时间的滚动和滑动窗口

需求

nc -lk 9999

有如下数据表示:

信号灯编号和通过该信号灯的车的数量

9,3

9,2

9,7

4,9

2,6

1,5

2,3

5,7

5,4

需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口

需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口

 

​​​​​​​代码实现


  
  1. package cn.itcast.window;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
  10. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. /**
  13.  * Author itcast
  14.  * Desc
  15.  * nc -lk 9999
  16.  * 有如下数据表示:
  17.  * 信号灯编号和通过该信号灯的车的数量
  18. 9,3
  19. 9,2
  20. 9,7
  21. 4,9
  22. 2,6
  23. 1,5
  24. 2,3
  25. 5,7
  26. 5,4
  27.  * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
  28.  * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
  29.  */
  30. public class WindowDemo01_TimeWindow {
  31.     public static void main(String[] args) throws Exception {
  32.         //1.env
  33.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  34.         //2.Source
  35.         DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
  36.         //3.Transformation
  37.         //将9,3转为CartInfo(9,3)
  38.         SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
  39.             @Override
  40.             public CartInfo map(String value) throws Exception {
  41.                 String[] arr = value.split(",");
  42.                 return new CartInfo(arr[0], Integer.parseInt(arr[1]));
  43.             }
  44.         });
  45.         //分组
  46.         //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
  47.         // * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口
  48.         //timeWindow(Time size窗口大小, Time slide滑动间隔)
  49.         SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
  50.                 .keyBy(CartInfo::getSensorId)
  51.                 //.timeWindow(Time.seconds(5))//当size==slide,可以只写一个
  52.                 //.timeWindow(Time.seconds(5), Time.seconds(5))
  53.                 .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  54.                 .sum("count");
  55.         // * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口
  56.         SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
  57.                 .keyBy(CartInfo::getSensorId)
  58.                 //.timeWindow(Time.seconds(10), Time.seconds(5))
  59.                 .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  60.                 .sum("count");
  61.         //4.Sink
  62. /*
  63. 1,5
  64. 2,5
  65. 3,5
  66. 4,5
  67. */
  68.         //result1.print();
  69.         result2.print();
  70.         //5.execute
  71.         env.execute();
  72.     }
  73.     @Data
  74.     @AllArgsConstructor
  75.     @NoArgsConstructor
  76.     public static class CartInfo {
  77.         private String sensorId;//信号灯id
  78.         private Integer count;//通过该信号灯的车的数量
  79.     }
  80. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116247545

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。