2021年大数据Flink(二十四):Allowed Lateness案例演示
        【摘要】 
                    Allowed Lateness案例演示 
需求 
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 
要求每隔5s,计算5秒内,每个用户的订单总金额 
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。 
并使用OutputTag+allowedLateness解决数据丢失问题 
...
    
    
    
    Allowed Lateness案例演示
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
并使用OutputTag+allowedLateness解决数据丢失问题
API

  
   - 
    
     
    
    
     
      package cn.itcast.watermaker;
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      import lombok.AllArgsConstructor;
     
    
- 
    
     
    
    
     
      import lombok.Data;
     
    
- 
    
     
    
    
     
      import lombok.NoArgsConstructor;
     
    
- 
    
     
    
    
     
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
     
    
- 
    
     
    
    
     
      import org.apache.flink.api.common.typeinfo.TypeInformation;
     
    
- 
    
     
    
    
     
      import org.apache.flink.streaming.api.datastream.DataStream;
     
    
- 
    
     
    
    
     
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
     
    
- 
    
     
    
    
     
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
     
    
- 
    
     
    
    
     
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
     
    
- 
    
     
    
    
     
      import org.apache.flink.streaming.api.functions.source.SourceFunction;
     
    
- 
    
     
    
    
     
      import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
     
    
- 
    
     
    
    
     
      import org.apache.flink.streaming.api.windowing.time.Time;
     
    
- 
    
     
    
    
     
      import org.apache.flink.util.OutputTag;
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      import java.time.Duration;
     
    
- 
    
     
    
    
     
      import java.util.Random;
     
    
- 
    
     
    
    
     
      import java.util.UUID;
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      /**
     
    
- 
    
     
    
    
     
       * Author itcast
     
    
- 
    
     
    
    
     
       * Desc
     
    
- 
    
     
    
    
     
       * 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
     
    
- 
    
     
    
    
     
       * 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
     
    
- 
    
     
    
    
     
       * 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
     
    
- 
    
     
    
    
     
       */
     
    
- 
    
     
    
    
     
      public class WatermakerDemo03_AllowedLateness {
     
    
- 
    
     
    
    
     
          public static void main(String[] args) throws Exception {
     
    
- 
    
     
    
    
     
              //1.env
     
    
- 
    
     
    
    
     
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     
    
- 
    
     
    
    
     
              //2.Source
     
    
- 
    
     
    
    
     
              //模拟实时订单数据(数据有延迟和乱序)
     
    
- 
    
     
    
    
     
              DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
     
    
- 
    
     
    
    
     
                  private boolean flag = true;
     
    
- 
    
     
    
    
     
                  @Override
     
    
- 
    
     
    
    
     
                  public void run(SourceContext<Order> ctx) throws Exception {
     
    
- 
    
     
    
    
     
                      Random random = new Random();
     
    
- 
    
     
    
    
     
                      while (flag) {
     
    
- 
    
     
    
    
     
                          String orderId = UUID.randomUUID().toString();
     
    
- 
    
     
    
    
     
                          int userId = random.nextInt(3);
     
    
- 
    
     
    
    
     
                          int money = random.nextInt(100);
     
    
- 
    
     
    
    
     
                          //模拟数据延迟和乱序!
     
    
- 
    
     
    
    
     
                          long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
     
    
- 
    
     
    
    
     
                          ctx.collect(new Order(orderId, userId, money, eventTime));
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
                          //TimeUnit.SECONDS.sleep(1);
     
    
- 
    
     
    
    
     
                      }
     
    
- 
    
     
    
    
     
                  }
     
    
- 
    
     
    
    
     
                  @Override
     
    
- 
    
     
    
    
     
                  public void cancel() {
     
    
- 
    
     
    
    
     
                      flag = false;
     
    
- 
    
     
    
    
     
                  }
     
    
- 
    
     
    
    
     
              });
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
              //3.Transformation
     
    
- 
    
     
    
    
     
              DataStream<Order> watermakerDS = orderDS
     
    
- 
    
     
    
    
     
                      .assignTimestampsAndWatermarks(
     
    
- 
    
     
    
    
     
                              WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
     
    
- 
    
     
    
    
     
                                      .withTimestampAssigner((event, timestamp) -> event.getEventTime())
     
    
- 
    
     
    
    
     
                      );
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
              //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
     
    
- 
    
     
    
    
     
              //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
     
    
- 
    
     
    
    
     
              OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
              SingleOutputStreamOperator<Order> result = watermakerDS
     
    
- 
    
     
    
    
     
                      .keyBy(Order::getUserId)
     
    
- 
    
     
    
    
     
                      //.timeWindow(Time.seconds(5), Time.seconds(5))
     
    
- 
    
     
    
    
     
                      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
     
    
- 
    
     
    
    
     
                      .allowedLateness(Time.seconds(5))
     
    
- 
    
     
    
    
     
                      .sideOutputLateData(outputTag)
     
    
- 
    
     
    
    
     
                      .sum("money");
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
              DataStream<Order> result2 = result.getSideOutput(outputTag);
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
              //4.Sink
     
    
- 
    
     
    
    
     
              result.print("正常的数据和迟到不严重的数据");
     
    
- 
    
     
    
    
     
              result2.print("迟到严重的数据");
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
              //5.execute
     
    
- 
    
     
    
    
     
              env.execute();
     
    
- 
    
     
    
    
     
          }
     
    
- 
    
     
    
    
     
          @Data
     
    
- 
    
     
    
    
     
          @AllArgsConstructor
     
    
- 
    
     
    
    
     
          @NoArgsConstructor
     
    
- 
    
     
    
    
     
          public static class Order {
     
    
- 
    
     
    
    
     
              private String orderId;
     
    
- 
    
     
    
    
     
              private Integer userId;
     
    
- 
    
     
    
    
     
              private Integer money;
     
    
- 
    
     
    
    
     
              private Long eventTime;
     
    
- 
    
     
    
    
     
          }
     
    
- 
    
     
    
    
     
      }
     
    
 
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116279739
        【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
            cloudbbs@huaweicloud.com
        
        
        
        
        
        
        - 点赞
- 收藏
- 关注作者
 
             
           
评论(0)