2021年大数据Flink(二十):案例二 基于数量的滚动和滑动窗口

举报
Lansonli 发表于 2021/09/29 00:20:25 2021/09/29
2.3k+ 0 0
【摘要】 目录 案例二 基于数量的滚动和滑动窗口 需求 ​​​​​​​代码实现 案例二 基于数量的滚动和滑动窗口 需求 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗...

目录

案例二 基于数量的滚动和滑动窗口

需求

​​​​​​​代码实现


案例二 基于数量的滚动和滑动窗口

需求

需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口

需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口

​​​​​​​代码实现


      package cn.itcast.window;
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      /**
       * Author itcast
       * Desc
       * nc -lk 9999
       * 有如下数据表示:
       * 信号灯编号和通过该信号灯的车的数量
      9,3
      9,2
      9,7
      4,9
      2,6
      1,5
      2,3
      5,7
      5,4
       * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
       * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
       */
      public class WindowDemo02_CountWindow {
          public static void main(String[] args) throws Exception {
              //1.env
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              //2.Source
              DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
              //3.Transformation
              //将9,3转为CartInfo(9,3)
              SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
                  @Override
                  public CartInfo map(String value) throws Exception {
                      String[] arr = value.split(",");
                      return new CartInfo(arr[0], Integer.parseInt(arr[1]));
                  }
              });
              //分组
              //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
              // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
              //countWindow(long size, long slide)
              SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
                      .keyBy(CartInfo::getSensorId)
                      //.countWindow(5L, 5L)
                      .countWindow( 5L)
                      .sum("count");
              // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
              //countWindow(long size, long slide)
              SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
                      .keyBy(CartInfo::getSensorId)
                      .countWindow(5L, 3L)
                      .sum("count");
              //4.Sink
              //result1.print();
              /*
      1,1
      1,1
      1,1
      1,1
      2,1
      1,1
               */
              result2.print();
              /*
      1,1
      1,1
      2,1
      1,1
      2,1
      3,1
      4,1
               */
              //5.execute
              env.execute();
          }
          @Data
          @AllArgsConstructor
          @NoArgsConstructor
          public static class CartInfo {
              private String sensorId;//信号灯id
              private Integer count;//通过该信号灯的车的数量
          }
      }
  
 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116278662

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。