2021年大数据Flink(三十五):​​​​​​​Table与SQL ​​​​​​案例二

举报
Lansonli 发表于 2021/09/27 23:43:13 2021/09/27
【摘要】 目录 案例二 需求 代码实现-SQL 代码实现-Table 案例二 需求 使用SQL和Table两种方式对DataStream中的单词进行统计 代码实现-SQL package cn.itcast.sql; import lombok.AllArgsConstructor;import lombok.Data;imp...

目录

案例二

需求

代码实现-SQL

代码实现-Table


案例二

需求

使用SQL和Table两种方式对DataStream中的单词进行统计

代码实现-SQL


  
  1. package cn.itcast.sql;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.table.api.Table;
  9. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  10. import static org.apache.flink.table.api.Expressions.$;
  11. /**
  12.  * Author itcast
  13.  * Desc
  14.  */
  15. public class FlinkSQL_Table_Demo02 {
  16.     public static void main(String[] args) throws Exception {
  17.         //1.准备环境
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  20.         //2.Source
  21.         DataStream<WC> input = env.fromElements(
  22.                 new WC("Hello", 1),
  23.                 new WC("World", 1),
  24.                 new WC("Hello", 1)
  25.         );
  26.         //3.注册表
  27.         tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency"));
  28.         //4.执行查询
  29.         Table resultTable = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
  30.         //5.输出结果
  31.         //toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate
  32.         //DataStream<WC> resultDS = tEnv.toAppendStream(resultTable, WC.class);
  33.         DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class);
  34.         resultDS.print();
  35.         env.execute();
  36.     }
  37.     @Data
  38.     @NoArgsConstructor
  39.     @AllArgsConstructor
  40.     public static class WC {
  41.         public String word;
  42.         public long frequency;
  43.     }
  44. }

 

代码实现-Table


  
  1. package cn.itcast.sql;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.table.api.Table;
  9. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  10. import static org.apache.flink.table.api.Expressions.$;
  11. /**
  12.  * Author itcast
  13.  * Desc
  14.  */
  15. public class FlinkSQL_Table_Demo03 {
  16.     public static void main(String[] args) throws Exception {
  17.         //1.准备环境
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  20.         //2.Source
  21.         DataStream<WC> input = env.fromElements(
  22.                 new WC("Hello", 1),
  23.                 new WC("World", 1),
  24.                 new WC("Hello", 1)
  25.         );
  26.         //3.注册表
  27.         Table table = tEnv.fromDataStream(input);
  28.         //4.执行查询
  29.         Table resultTable = table
  30.                 .groupBy($("word"))
  31.                 .select($("word"), $("frequency").sum().as("frequency"))
  32.                 .filter($("frequency").isEqual(2));
  33.         //5.输出结果
  34.         DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class);
  35.         resultDS.print();
  36.         env.execute();
  37.     }
  38.     @Data
  39.     @NoArgsConstructor
  40.     @AllArgsConstructor
  41.     public static class WC {
  42.         public String word;
  43.         public long frequency;
  44.     }
  45. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116357527

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。