FlinkTable&SQL(六)
【摘要】 FlinkTable&SQL(六、七) 今日目标了解Flink Table&SQL发展历史了解为什么要使用Table API & SQL掌握Flink Table&SQL进行批处理开发掌握Flink Table&SQL进行流处理开发掌握常用的开发案例Flink-SQL的常用算子 Flink Table & SQLFlinkTable & SQL 是抽象级别更高的操作, 底层Flink Ru...
FlinkTable&SQL(六、七)
今日目标
- 了解Flink Table&SQL发展历史
- 了解为什么要使用Table API & SQL
- 掌握Flink Table&SQL进行批处理开发
- 掌握Flink Table&SQL进行流处理开发
- 掌握常用的开发案例
- Flink-SQL的常用算子
Flink Table & SQL
-
FlinkTable & SQL 是抽象级别更高的操作, 底层Flink Runtime => Stream 流程
-
批处理是流处理的一种特殊形态
-
FlinkSQL 遵循ANSI的SQL规范
-
Flink1.9之前, FlinkSQL包括两套Table api , DataStream Table API(流处理) ,DataSet Table API(批处理)
-
Planner 查询器, 抽象语法树,parser、optimizer、codegen(模板代码生成),最终生成 Flink Runtime 直接进行执行的代码
-
Planner包括old Planner 和 Blink Planner ,Blink Planner 底层实现了 流批一体(默认的Planner)
FlinkTable & SQL 程序结构
-
导入 pom 依赖, jar包坐标
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- flink执行计划,这是1.9版本之前的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- blink执行计划,1.11+默认的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency>
-
创建 FlinkTable FlinkSQL的 表的方式
// table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...); // register the Table projTable as table "projectedTable" tableEnv.createTemporaryView("projectedTable", projTable);
-
SQL的四种语句
- DDL 数据定义语言, 创建数据库、表,删除数据库、表
- DML 数据操作语言, 对数据进行增、删、改操作
- DCL 数据控制语言, 对数据的操作权限进行设置 grant revoke
- DQL 数据查询语言,对数据表中的数据进行查询,基础查询,复杂查询,多表查询,子查询
-
需求
将两个数据流 DataStream 通过 FlinkTable & SQL API 进行 union all 操作,条件ds1 amount>2 union all ds2 amount<2
-
开发步骤
package cn.itcast.flink.sql; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import java.util.Arrays; import static org.apache.flink.table.api.Expressions.$; /** * Author itcast * Date 2021/6/22 9:45 * Desc TODO */ public class FlinkTableAPIDemo { public static void main(String[] args) throws Exception { //1.准备环境 创建流环境 和 流表环境,并行度设置为1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //创建流表环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings); //2.Source 创建数据集 DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); //3.注册表 将数据流转换成表 // 通过fromDataStream将数据流转换成表 Table orderTableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount")); // 将数据流转换成 创建临时视图 tEnv.createTemporaryView("orderTableB",orderB,$("user"), $("product"), $("amount")); //4.执行查询,查询order1的amount>2并union all 上 order2的amoun<2的数据生成表 Table result = tEnv.sqlQuery("" + "select * from " + orderTableA + " where amount>2 " + "union all " + "select * from orderTableB where amount<2"); //4.1 将结果表转换成toAppendStream数据流 //字段的名称和类型 result.printSchema(); DataStream<Row> resultDS = tEnv.toAppendStream(result, Row.class); //5.打印结果 resultDS.print(); //6.执行环境 env.execute(); // 创建实体类 user:Long product:String amount:int } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { public Long user; public String product; public int amount; } }
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)