2021年大数据Flink(二十三):​​​​​​​Watermaker案例演示

举报
Lansonli 发表于 2021/09/28 22:43:55 2021/09/28
【摘要】 目录 Watermaker案例演示 需求 API 代码实现-1-开发版-掌握 ​​​​​​​代码实现-2-验证版-了解 Watermaker案例演示 需求 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermaker来解决一定程...

目录

Watermaker案例演示

需求

API

代码实现-1-开发版-掌握

​​​​​​​代码实现-2-验证版-了解


Watermaker案例演示

需求

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额

并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。

 

API

 

 

注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor

 

 

​​​​​​​代码实现-1-开发版-掌握

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html


  
  1. package cn.itcast.watermaker;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  9. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  10. import org.apache.flink.streaming.api.windowing.time.Time;
  11. import java.time.Duration;
  12. import java.util.Random;
  13. import java.util.UUID;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16.  * Author itcast
  17.  * Desc
  18.  * 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
  19.  * 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
  20.  * 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
  21.  */
  22. public class WatermakerDemo01_Develop {
  23.     public static void main(String[] args) throws Exception {
  24.         //1.env
  25.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  26.         //2.Source
  27.         //模拟实时订单数据(数据有延迟和乱序)
  28.         DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
  29.             private boolean flag = true;
  30.             @Override
  31.             public void run(SourceContext<Order> ctx) throws Exception {
  32.                 Random random = new Random();
  33.                 while (flag) {
  34.                     String orderId = UUID.randomUUID().toString();
  35.                     int userId = random.nextInt(3);
  36.                     int money = random.nextInt(100);
  37.                     //模拟数据延迟和乱序!
  38.                     long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
  39.                     ctx.collect(new Order(orderId, userId, money, eventTime));
  40.                     TimeUnit.SECONDS.sleep(1);
  41.                 }
  42.             }
  43.             @Override
  44.             public void cancel() {
  45.                 flag = false;
  46.             }
  47.         });
  48.         //3.Transformation
  49.         //-告诉Flink要基于事件时间来计算!
  50.         //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
  51.         //-告诉Flnk数据中的哪一列是事件时间,因为Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
  52.         /*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(
  53.                 new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允许的延迟时间或乱序时间
  54.                     @Override
  55.                     public long extractTimestamp(Order element) {
  56.                         return element.eventTime;
  57.                         //指定事件时间是哪一列,Flink底层会自动计算:
  58.                         //Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
  59.                     }
  60.         });*/
  61.         DataStream<Order> watermakerDS = orderDS
  62.                 .assignTimestampsAndWatermarks(
  63.                         WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  64.                                 .withTimestampAssigner((event, timestamp) -> event.getEventTime())
  65.                 );
  66.         //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
  67.         //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
  68.         DataStream<Order> result = watermakerDS
  69.                 .keyBy(Order::getUserId)
  70.                 //.timeWindow(Time.seconds(5), Time.seconds(5))
  71.                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  72.                 .sum("money");
  73.         //4.Sink
  74.         result.print();
  75.         //5.execute
  76.         env.execute();
  77.     }
  78.     @Data
  79.     @AllArgsConstructor
  80.     @NoArgsConstructor
  81.     public static class Order {
  82.         private String orderId;
  83.         private Integer userId;
  84.         private Integer money;
  85.         private Long eventTime;
  86.     }
  87. }

 

​​​​​​​代码实现-2-验证版-了解


  
  1. package cn.itcast.watermaker;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.commons.lang3.time.FastDateFormat;
  6. import org.apache.flink.api.common.eventtime.*;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  11. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  15. import org.apache.flink.util.Collector;
  16. import java.util.ArrayList;
  17. import java.util.List;
  18. import java.util.Random;
  19. import java.util.UUID;
  20. import java.util.concurrent.TimeUnit;
  21. /**
  22.  * Author itcast
  23.  * Desc
  24.  * 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
  25.  * 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
  26.  * 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
  27.  */
  28. public class WatermakerDemo02_Check {
  29.     public static void main(String[] args) throws Exception {
  30.         FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
  31.         //1.env
  32.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  33.         //2.Source
  34.         //模拟实时订单数据(数据有延迟和乱序)
  35.         DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
  36.             private boolean flag = true;
  37.             @Override
  38.             public void run(SourceContext<Order> ctx) throws Exception {
  39.                 Random random = new Random();
  40.                 while (flag) {
  41.                     String orderId = UUID.randomUUID().toString();
  42.                     int userId = random.nextInt(3);
  43.                     int money = random.nextInt(100);
  44.                     //模拟数据延迟和乱序!
  45.                     long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
  46.                     System.out.println("发送的数据为: "+userId + " : " + df.format(eventTime));
  47.                     ctx.collect(new Order(orderId, userId, money, eventTime));
  48.                     TimeUnit.SECONDS.sleep(1);
  49.                 }
  50.             }
  51.             @Override
  52.             public void cancel() {
  53.                 flag = false;
  54.             }
  55.         });
  56.         //3.Transformation
  57.         /*DataStream<Order> watermakerDS = orderDS
  58.                 .assignTimestampsAndWatermarks(
  59.                         WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  60.                                 .withTimestampAssigner((event, timestamp) -> event.getEventTime())
  61.                 );*/
  62.         //开发中直接使用上面的即可
  63.         //学习测试时可以自己实现
  64.         DataStream<Order> watermakerDS = orderDS
  65.                 .assignTimestampsAndWatermarks(
  66.                         new WatermarkStrategy<Order>() {
  67.                             @Override
  68.                             public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  69.                                 return new WatermarkGenerator<Order>() {
  70.                                     private int userId = 0;
  71.                                     private long eventTime = 0L;
  72.                                     private final long outOfOrdernessMillis = 3000;
  73.                                     private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
  74.                                     @Override
  75.                                     public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
  76.                                         userId = event.userId;
  77.                                         eventTime = event.eventTime;
  78.                                         maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
  79.                                     }
  80.                                     @Override
  81.                                     public void onPeriodicEmit(WatermarkOutput output) {
  82.                                         //Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间
  83.                                         Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
  84.                                         System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));
  85.                                         output.emitWatermark(watermark);
  86.                                     }
  87.                                 };
  88.                             }
  89.                         }.withTimestampAssigner((event, timestamp) -> event.getEventTime())
  90.                 );
  91.         //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
  92.         //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
  93.        /* DataStream<Order> result = watermakerDS
  94.                  .keyBy(Order::getUserId)
  95.                 //.timeWindow(Time.seconds(5), Time.seconds(5))
  96.                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  97.                 .sum("money");*/
  98.         //开发中使用上面的代码进行业务计算即可
  99.         //学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间
  100.         DataStream<String> result = watermakerDS
  101.                 .keyBy(Order::getUserId)
  102.                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  103.                 //把apply中的函数应用在窗口中的数据上
  104.                 //WindowFunction<IN, OUT, KEY, W extends Window>
  105.                 .apply(new WindowFunction<Order, String, Integer, TimeWindow>() {
  106.                     @Override
  107.                     public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {
  108.                         //准备一个集合用来存放属于该窗口的数据的事件时间
  109.                         List<String> eventTimeList = new ArrayList<>();
  110.                         for (Order order : input) {
  111.                             Long eventTime = order.eventTime;
  112.                             eventTimeList.add(df.format(eventTime));
  113.                         }
  114.                         String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s",
  115.                                 key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);
  116.                         out.collect(outStr);
  117.                     }
  118.                 });
  119.         //4.Sink
  120.         result.print();
  121.         //5.execute
  122.         env.execute();
  123.     }
  124.     @Data
  125.     @AllArgsConstructor
  126.     @NoArgsConstructor
  127.     public static class Order {
  128.         private String orderId;
  129.         private Integer userId;
  130.         private Integer money;
  131.         private Long eventTime;
  132.     }
  133. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116279701

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。