2021年大数据Flink(三十四):​​​​​​​Table与SQL ​​​​​​案例一

举报
Lansonli 发表于 2021/09/28 00:15:11 2021/09/28
2.1k+ 0 0
【摘要】 ​​​​​​ 目录 ​​​​​​案例一 需求 代码实现 案例一 需求 将DataStream注册为Table和View并进行SQL统计   代码实现 package cn.itcast.sql; import lombok.AllArgsConstructor;import lombok.Data;imp...

​​​​​​

目录

​​​​​​案例一

需求

代码实现


案例一

需求

将DataStream注册为Table和View并进行SQL统计

代码实现


      package cn.itcast.sql;
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import java.util.Arrays;
      import static org.apache.flink.table.api.Expressions.$;
      /**
       * Author itcast
       * Desc
       */
      public class FlinkSQL_Table_Demo01 {
          public static void main(String[] args) throws Exception {
              //1.准备环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
              //StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
              StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
              //2.Source
              DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                      new Order(1L, "beer", 3),
                      new Order(1L, "diaper", 4),
                      new Order(3L, "rubber", 2)));
              DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                      new Order(2L, "pen", 3),
                      new Order(2L, "rubber", 3),
                      new Order(4L, "beer", 1)));
              //3.注册表
              // convert DataStream to Table
              Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
              // register DataStream as Table
              tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));
              //4.执行查询
              System.out.println(tableA);
              // union the two tables
              Table resultTable = tEnv.sqlQuery(
                      "SELECT * FROM " + tableA + " WHERE amount > 2 " +
                      "UNION ALL " +
                      "SELECT * FROM OrderB WHERE amount < 2"
              );
              //5.输出结果
              DataStream<Order> resultDS = tEnv.toAppendStream(resultTable, Order.class);
              resultDS.print();
              env.execute();
          }
          @Data
          @NoArgsConstructor
          @AllArgsConstructor
          public static class Order {
              public Long user;
              public String product;
              public int amount;
          }
      }
  
 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116357228

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。