2021年大数据Flink(二十):案例二 基于数量的滚动和滑动窗口

举报
Lansonli 发表于 2021/09/29 00:20:25 2021/09/29
【摘要】 目录 案例二 基于数量的滚动和滑动窗口 需求 ​​​​​​​代码实现 案例二 基于数量的滚动和滑动窗口 需求 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗...

目录

案例二 基于数量的滚动和滑动窗口

需求

​​​​​​​代码实现


案例二 基于数量的滚动和滑动窗口

需求

需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口

需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口

 

​​​​​​​代码实现


  
  1. package cn.itcast.window;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. /**
  10.  * Author itcast
  11.  * Desc
  12.  * nc -lk 9999
  13.  * 有如下数据表示:
  14.  * 信号灯编号和通过该信号灯的车的数量
  15. 9,3
  16. 9,2
  17. 9,7
  18. 4,9
  19. 2,6
  20. 1,5
  21. 2,3
  22. 5,7
  23. 5,4
  24.  * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
  25.  * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
  26.  */
  27. public class WindowDemo02_CountWindow {
  28.     public static void main(String[] args) throws Exception {
  29.         //1.env
  30.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  31.         //2.Source
  32.         DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
  33.         //3.Transformation
  34.         //将9,3转为CartInfo(9,3)
  35.         SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
  36.             @Override
  37.             public CartInfo map(String value) throws Exception {
  38.                 String[] arr = value.split(",");
  39.                 return new CartInfo(arr[0], Integer.parseInt(arr[1]));
  40.             }
  41.         });
  42.         //分组
  43.         //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
  44.         // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
  45.         //countWindow(long size, long slide)
  46.         SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
  47.                 .keyBy(CartInfo::getSensorId)
  48.                 //.countWindow(5L, 5L)
  49.                 .countWindow( 5L)
  50.                 .sum("count");
  51.         // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
  52.         //countWindow(long size, long slide)
  53.         SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
  54.                 .keyBy(CartInfo::getSensorId)
  55.                 .countWindow(5L, 3L)
  56.                 .sum("count");
  57.         //4.Sink
  58.         //result1.print();
  59.         /*
  60. 1,1
  61. 1,1
  62. 1,1
  63. 1,1
  64. 2,1
  65. 1,1
  66.          */
  67.         result2.print();
  68.         /*
  69. 1,1
  70. 1,1
  71. 2,1
  72. 1,1
  73. 2,1
  74. 3,1
  75. 4,1
  76.          */
  77.         //5.execute
  78.         env.execute();
  79.     }
  80.     @Data
  81.     @AllArgsConstructor
  82.     @NoArgsConstructor
  83.     public static class CartInfo {
  84.         private String sensorId;//信号灯id
  85.         private Integer count;//通过该信号灯的车的数量
  86.     }
  87. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116278662

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。