2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三

举报
Lansonli 发表于 2021/09/29 00:53:15 2021/09/29
【摘要】 目录 案例三 需求 ​​​​​​​编码步骤 ​​​​​​​代码实现-方式1 ​​​​​​​代码实现-方式2 ​​​​​​​案例三 需求 使用Flink SQL来统计5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额 也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额 上面的需求...

目录

案例三

需求

​​​​​​​编码步骤

​​​​​​​代码实现-方式1

​​​​​​​代码实现-方式2


​​​​​​​案例三

需求

使用Flink SQL来统计5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额

也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额

上面的需求使用流处理的Window的基于时间的滚动窗口就可以搞定!

那么接下来使用FlinkTable&SQL-API来实现

 

​​​​​​​编码步骤

1.创建环境

2.使用自定义函数模拟实时流数据

3.设置事件时间和Watermaker

4.注册表

5.执行sql-可以使用sql风格或table风格(了解)

6.输出结果

7.触发执行

 

​​​​​​​代码实现-方式1


  
  1. package cn.itcast.sql;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  11. import org.apache.flink.table.api.Table;
  12. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  13. import org.apache.flink.types.Row;
  14. import java.time.Duration;
  15. import java.util.Random;
  16. import java.util.UUID;
  17. import java.util.concurrent.TimeUnit;
  18. import static org.apache.flink.table.api.Expressions.$;
  19. /**
  20.  * Author itcast
  21.  * Desc
  22.  */
  23. public class FlinkSQL_Table_Demo04 {
  24.     public static void main(String[] args) throws Exception {
  25.         //1.准备环境
  26.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  27.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  28.         //2.Source
  29.         DataStreamSource<Order> orderDS  = env.addSource(new RichSourceFunction<Order>() {
  30.             private Boolean isRunning = true;
  31.             @Override
  32.             public void run(SourceContext<Order> ctx) throws Exception {
  33.                 Random random = new Random();
  34.                 while (isRunning) {
  35.                     Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
  36.                     TimeUnit.SECONDS.sleep(1);
  37.                     ctx.collect(order);
  38.                 }
  39.             }
  40.             @Override
  41.             public void cancel() {
  42.                 isRunning = false;
  43.             }
  44.         });
  45.         //3.Transformation
  46.         DataStream<Order> watermakerDS = orderDS
  47.                 .assignTimestampsAndWatermarks(
  48.                         WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  49.                                 .withTimestampAssigner((event, timestamp) -> event.getCreateTime())
  50.                 );
  51.         //4.注册表
  52.         tEnv.createTemporaryView("t_order", watermakerDS,
  53.                 $("orderId"), $("userId"), $("money"), $("createTime").rowtime());
  54.         //5.执行SQL
  55.         String sql = "select " +
  56.                 "userId," +
  57.                 "count(*) as totalCount," +
  58.                 "max(money) as maxMoney," +
  59.                 "min(money) as minMoney " +
  60.                 "from t_order " +
  61.                 "group by userId," +
  62.                 "tumble(createTime, interval '5' second)";
  63.         Table ResultTable = tEnv.sqlQuery(sql);
  64.         //6.Sink
  65.         //将SQL的执行结果转换成DataStream再打印出来
  66.         //toAppendStream → 将计算后的数据append到结果DataStream中去
  67.         //toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
  68.         DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
  69.         resultDS.print();
  70.         env.execute();
  71.     }
  72.     @Data
  73.     @AllArgsConstructor
  74.     @NoArgsConstructor
  75.     public static class Order {
  76.         private String orderId;
  77.         private Integer userId;
  78.         private Integer money;
  79.         private Long createTime;
  80.     }
  81. }

 

toAppendStream → 将计算后的数据append到结果DataStream中去

toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false

​​​​​​​代码实现-方式2


  
  1. package cn.itcast.sql;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  11. import org.apache.flink.table.api.Table;
  12. import org.apache.flink.table.api.Tumble;
  13. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  14. import org.apache.flink.types.Row;
  15. import java.time.Duration;
  16. import java.util.Random;
  17. import java.util.UUID;
  18. import java.util.concurrent.TimeUnit;
  19. import static org.apache.flink.table.api.Expressions.$;
  20. import static org.apache.flink.table.api.Expressions.lit;
  21. /**
  22.  * Author itcast
  23.  * Desc
  24.  */
  25. public class FlinkSQL_Table_Demo05 {
  26.     public static void main(String[] args) throws Exception {
  27.         //1.准备环境
  28.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  30.         //2.Source
  31.         DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
  32.             private Boolean isRunning = true;
  33.             @Override
  34.             public void run(SourceContext<Order> ctx) throws Exception {
  35.                 Random random = new Random();
  36.                 while (isRunning) {
  37.                     Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
  38.                     TimeUnit.SECONDS.sleep(1);
  39.                     ctx.collect(order);
  40.                 }
  41.             }
  42.             @Override
  43.             public void cancel() {
  44.                 isRunning = false;
  45.             }
  46.         });
  47.         //3.Transformation
  48.         DataStream<Order> watermakerDS = orderDS
  49.                 .assignTimestampsAndWatermarks(
  50.                         WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  51.                                 .withTimestampAssigner((event, timestamp) -> event.getCreateTime())
  52.                 );
  53.         //4.注册表
  54.         tEnv.createTemporaryView("t_order", watermakerDS,
  55.                 $("orderId"), $("userId"), $("money"), $("createTime").rowtime());
  56.         //查看表约束
  57.         tEnv.from("t_order").printSchema();
  58.         //5.TableAPI查询
  59.         Table ResultTable = tEnv.from("t_order")
  60.                 //.window(Tumble.over("5.second").on("createTime").as("tumbleWindow"))
  61.                 .window(Tumble.over(lit(5).second())
  62.                         .on($("createTime"))
  63.                         .as("tumbleWindow"))
  64.                 .groupBy($("tumbleWindow"), $("userId"))
  65.                 .select(
  66.                         $("userId"),
  67.                         $("userId").count().as("totalCount"),
  68.                         $("money").max().as("maxMoney"),
  69.                         $("money").min().as("minMoney"));
  70.         //6.将SQL的执行结果转换成DataStream再打印出来
  71.         DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
  72.         resultDS.print();
  73.         //7.excute
  74.         env.execute();
  75.     }
  76.     @Data
  77.     @AllArgsConstructor
  78.     @NoArgsConstructor
  79.     public static class Order {
  80.         private String orderId;
  81.         private Integer userId;
  82.         private Integer money;
  83.         private Long createTime;
  84.     }
  85. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116357554

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。