2021年大数据Flink(二十):案例二 基于数量的滚动和滑动窗口
        【摘要】 
                    目录 
案例二 基于数量的滚动和滑动窗口 
需求 
代码实现 
案例二 基于数量的滚动和滑动窗口 
需求 
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗...
    
    
    
    目录
案例二 基于数量的滚动和滑动窗口
需求
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
代码实现
  
   - 
    
     
    
    
     
      package cn.itcast.window;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      import lombok.AllArgsConstructor;
     
    
 
   - 
    
     
    
    
     
      import lombok.Data;
     
    
 
   - 
    
     
    
    
     
      import lombok.NoArgsConstructor;
     
    
 
   - 
    
     
    
    
     
      import org.apache.flink.api.common.functions.MapFunction;
     
    
 
   - 
    
     
    
    
     
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
     
    
 
   - 
    
     
    
    
     
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
     
    
 
   - 
    
     
    
    
     
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      /**
     
    
 
   - 
    
     
    
    
     
       * Author itcast
     
    
 
   - 
    
     
    
    
     
       * Desc
     
    
 
   - 
    
     
    
    
     
       * nc -lk 9999
     
    
 
   - 
    
     
    
    
     
       * 有如下数据表示:
     
    
 
   - 
    
     
    
    
     
       * 信号灯编号和通过该信号灯的车的数量
     
    
 
   - 
    
     
    
    
     
      9,3
     
    
 
   - 
    
     
    
    
     
      9,2
     
    
 
   - 
    
     
    
    
     
      9,7
     
    
 
   - 
    
     
    
    
     
      4,9
     
    
 
   - 
    
     
    
    
     
      2,6
     
    
 
   - 
    
     
    
    
     
      1,5
     
    
 
   - 
    
     
    
    
     
      2,3
     
    
 
   - 
    
     
    
    
     
      5,7
     
    
 
   - 
    
     
    
    
     
      5,4
     
    
 
   - 
    
     
    
    
     
       * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
     
    
 
   - 
    
     
    
    
     
       * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
     
    
 
   - 
    
     
    
    
     
       */
     
    
 
   - 
    
     
    
    
     
      public class WindowDemo02_CountWindow {
     
    
 
   - 
    
     
    
    
     
          public static void main(String[] args) throws Exception {
     
    
 
   - 
    
     
    
    
     
              //1.env
     
    
 
   - 
    
     
    
    
     
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     
    
 
   - 
    
     
    
    
     
              //2.Source
     
    
 
   - 
    
     
    
    
     
              DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              //3.Transformation
     
    
 
   - 
    
     
    
    
     
              //将9,3转为CartInfo(9,3)
     
    
 
   - 
    
     
    
    
     
              SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
     
    
 
   - 
    
     
    
    
     
                  @Override
     
    
 
   - 
    
     
    
    
     
                  public CartInfo map(String value) throws Exception {
     
    
 
   - 
    
     
    
    
     
                      String[] arr = value.split(",");
     
    
 
   - 
    
     
    
    
     
                      return new CartInfo(arr[0], Integer.parseInt(arr[1]));
     
    
 
   - 
    
     
    
    
     
                  }
     
    
 
   - 
    
     
    
    
     
              });
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              //分组
     
    
 
   - 
    
     
    
    
     
              //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
     
    
 
   - 
    
     
    
    
     
              //countWindow(long size, long slide)
     
    
 
   - 
    
     
    
    
     
              SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
     
    
 
   - 
    
     
    
    
     
                      .keyBy(CartInfo::getSensorId)
     
    
 
   - 
    
     
    
    
     
                      //.countWindow(5L, 5L)
     
    
 
   - 
    
     
    
    
     
                      .countWindow( 5L)
     
    
 
   - 
    
     
    
    
     
                      .sum("count");
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
     
    
 
   - 
    
     
    
    
     
              //countWindow(long size, long slide)
     
    
 
   - 
    
     
    
    
     
              SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
     
    
 
   - 
    
     
    
    
     
                      .keyBy(CartInfo::getSensorId)
     
    
 
   - 
    
     
    
    
     
                      .countWindow(5L, 3L)
     
    
 
   - 
    
     
    
    
     
                      .sum("count");
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              //4.Sink
     
    
 
   - 
    
     
    
    
     
              //result1.print();
     
    
 
   - 
    
     
    
    
     
              /*
     
    
 
   - 
    
     
    
    
     
      1,1
     
    
 
   - 
    
     
    
    
     
      1,1
     
    
 
   - 
    
     
    
    
     
      1,1
     
    
 
   - 
    
     
    
    
     
      1,1
     
    
 
   - 
    
     
    
    
     
      2,1
     
    
 
   - 
    
     
    
    
     
      1,1
     
    
 
   - 
    
     
    
    
     
               */
     
    
 
   - 
    
     
    
    
     
              result2.print();
     
    
 
   - 
    
     
    
    
     
              /*
     
    
 
   - 
    
     
    
    
     
      1,1
     
    
 
   - 
    
     
    
    
     
      1,1
     
    
 
   - 
    
     
    
    
     
      2,1
     
    
 
   - 
    
     
    
    
     
      1,1
     
    
 
   - 
    
     
    
    
     
      2,1
     
    
 
   - 
    
     
    
    
     
      3,1
     
    
 
   - 
    
     
    
    
     
      4,1
     
    
 
   - 
    
     
    
    
     
               */
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              //5.execute
     
    
 
   - 
    
     
    
    
     
              env.execute();
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
          @Data
     
    
 
   - 
    
     
    
    
     
          @AllArgsConstructor
     
    
 
   - 
    
     
    
    
     
          @NoArgsConstructor
     
    
 
   - 
    
     
    
    
     
          public static class CartInfo {
     
    
 
   - 
    
     
    
    
     
              private String sensorId;//信号灯id
     
    
 
   - 
    
     
    
    
     
              private Integer count;//通过该信号灯的车的数量
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116278662
        【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
            cloudbbs@huaweicloud.com
        
        
        
        
        
        
        - 点赞
 - 收藏
 - 关注作者
 
            
           
评论(0)