2021年大数据Flink(二十四):​​​​​​​Allowed Lateness案例演示

举报
Lansonli 发表于 2021/09/28 23:41:49 2021/09/28
【摘要】 Allowed Lateness案例演示 需求 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。 并使用OutputTag+allowedLateness解决数据丢失问题 ​​​​​​...

Allowed Lateness案例演示

需求

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额

并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。

并使用OutputTag+allowedLateness解决数据丢失问题

​​​​​​​API

 


  
  1. package cn.itcast.watermaker;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.util.OutputTag;
  15. import java.time.Duration;
  16. import java.util.Random;
  17. import java.util.UUID;
  18. /**
  19.  * Author itcast
  20.  * Desc
  21.  * 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
  22.  * 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
  23.  * 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
  24.  */
  25. public class WatermakerDemo03_AllowedLateness {
  26.     public static void main(String[] args) throws Exception {
  27.         //1.env
  28.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29.         //2.Source
  30.         //模拟实时订单数据(数据有延迟和乱序)
  31.         DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
  32.             private boolean flag = true;
  33.             @Override
  34.             public void run(SourceContext<Order> ctx) throws Exception {
  35.                 Random random = new Random();
  36.                 while (flag) {
  37.                     String orderId = UUID.randomUUID().toString();
  38.                     int userId = random.nextInt(3);
  39.                     int money = random.nextInt(100);
  40.                     //模拟数据延迟和乱序!
  41.                     long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
  42.                     ctx.collect(new Order(orderId, userId, money, eventTime));
  43.                     //TimeUnit.SECONDS.sleep(1);
  44.                 }
  45.             }
  46.             @Override
  47.             public void cancel() {
  48.                 flag = false;
  49.             }
  50.         });
  51.         //3.Transformation
  52.         DataStream<Order> watermakerDS = orderDS
  53.                 .assignTimestampsAndWatermarks(
  54.                         WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  55.                                 .withTimestampAssigner((event, timestamp) -> event.getEventTime())
  56.                 );
  57.         //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
  58.         //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
  59.         OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));
  60.         SingleOutputStreamOperator<Order> result = watermakerDS
  61.                 .keyBy(Order::getUserId)
  62.                 //.timeWindow(Time.seconds(5), Time.seconds(5))
  63.                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  64.                 .allowedLateness(Time.seconds(5))
  65.                 .sideOutputLateData(outputTag)
  66.                 .sum("money");
  67.         DataStream<Order> result2 = result.getSideOutput(outputTag);
  68.         //4.Sink
  69.         result.print("正常的数据和迟到不严重的数据");
  70.         result2.print("迟到严重的数据");
  71.         //5.execute
  72.         env.execute();
  73.     }
  74.     @Data
  75.     @AllArgsConstructor
  76.     @NoArgsConstructor
  77.     public static class Order {
  78.         private String orderId;
  79.         private Integer userId;
  80.         private Integer money;
  81.         private Long eventTime;
  82.     }
  83. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116279739

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。