2021年大数据Flink(三十四):​​​​​​​Table与SQL ​​​​​​案例一

举报
Lansonli 发表于 2021/09/28 00:15:11 2021/09/28
【摘要】 ​​​​​​ 目录 ​​​​​​案例一 需求 代码实现 案例一 需求 将DataStream注册为Table和View并进行SQL统计   代码实现 package cn.itcast.sql; import lombok.AllArgsConstructor;import lombok.Data;imp...

​​​​​​

目录

​​​​​​案例一

需求

代码实现


案例一

需求

将DataStream注册为Table和View并进行SQL统计

 

代码实现


  
  1. package cn.itcast.sql;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.table.api.Table;
  8. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  9. import java.util.Arrays;
  10. import static org.apache.flink.table.api.Expressions.$;
  11. /**
  12.  * Author itcast
  13.  * Desc
  14.  */
  15. public class FlinkSQL_Table_Demo01 {
  16.     public static void main(String[] args) throws Exception {
  17.         //1.准备环境
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  20.         //StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
  21.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  22.         //2.Source
  23.         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
  24.                 new Order(1L, "beer", 3),
  25.                 new Order(1L, "diaper", 4),
  26.                 new Order(3L, "rubber", 2)));
  27.         DataStream<Order> orderB = env.fromCollection(Arrays.asList(
  28.                 new Order(2L, "pen", 3),
  29.                 new Order(2L, "rubber", 3),
  30.                 new Order(4L, "beer", 1)));
  31.         //3.注册表
  32.         // convert DataStream to Table
  33.         Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
  34.         // register DataStream as Table
  35.         tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));
  36.         //4.执行查询
  37.         System.out.println(tableA);
  38.         // union the two tables
  39.         Table resultTable = tEnv.sqlQuery(
  40.                 "SELECT * FROM " + tableA + " WHERE amount > 2 " +
  41.                 "UNION ALL " +
  42.                 "SELECT * FROM OrderB WHERE amount < 2"
  43.         );
  44.         //5.输出结果
  45.         DataStream<Order> resultDS = tEnv.toAppendStream(resultTable, Order.class);
  46.         resultDS.print();
  47.         env.execute();
  48.     }
  49.     @Data
  50.     @NoArgsConstructor
  51.     @AllArgsConstructor
  52.     public static class Order {
  53.         public Long user;
  54.         public String product;
  55.         public int amount;
  56.     }
  57. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116357228

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。