FlinkTable&SQL(六)

举报
Maynor学长 发表于 2022/07/24 11:48:20 2022/07/24
【摘要】 FlinkTable&SQL(六、七) 今日目标了解Flink Table&SQL发展历史了解为什么要使用Table API & SQL掌握Flink Table&SQL进行批处理开发掌握Flink Table&SQL进行流处理开发掌握常用的开发案例Flink-SQL的常用算子 Flink Table & SQLFlinkTable & SQL 是抽象级别更高的操作, 底层Flink Ru...

FlinkTable&SQL(六、七)

今日目标

  • 了解Flink Table&SQL发展历史
  • 了解为什么要使用Table API & SQL
  • 掌握Flink Table&SQL进行批处理开发
  • 掌握Flink Table&SQL进行流处理开发
  • 掌握常用的开发案例
  • Flink-SQL的常用算子

Flink Table & SQL

  • FlinkTable & SQL 是抽象级别更高的操作, 底层Flink Runtime => Stream 流程

  • 批处理是流处理的一种特殊形态

  • FlinkSQL 遵循ANSI的SQL规范

  • Flink1.9之前, FlinkSQL包括两套Table api , DataStream Table API(流处理) ,DataSet Table API(批处理)

  • Planner 查询器, 抽象语法树,parser、optimizer、codegen(模板代码生成),最终生成 Flink Runtime 直接进行执行的代码

  • Planner包括old Planner 和 Blink Planner ,Blink Planner 底层实现了 流批一体(默认的Planner)

image-20210622091627064

FlinkTable & SQL 程序结构

  • 导入 pom 依赖, jar包坐标

    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- flink执行计划,这是1.9版本之前的-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- blink执行计划,1.11+默认的-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
  • image-20210622094025542

  • 创建 FlinkTable FlinkSQL的 表的方式

    // table is the result of a simple projection query 
    Table projTable = tableEnv.from("X").select(...);
    
    // register the Table projTable as table "projectedTable"
    tableEnv.createTemporaryView("projectedTable", projTable);
    
  • SQL的四种语句

    1. DDL 数据定义语言, 创建数据库、表,删除数据库、表
    2. DML 数据操作语言, 对数据进行增、删、改操作
    3. DCL 数据控制语言, 对数据的操作权限进行设置 grant revoke
    4. DQL 数据查询语言,对数据表中的数据进行查询,基础查询,复杂查询,多表查询,子查询
  • 需求

    将两个数据流 DataStream 通过 FlinkTable & SQL API 进行 union all 操作,条件ds1 amount>2 union all ds2 amount<2

  • 开发步骤

    package cn.itcast.flink.sql;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.util.Arrays;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    /**
     * Author itcast
     * Date 2021/6/22 9:45
     * Desc TODO
     */
    public class FlinkTableAPIDemo {
        public static void main(String[] args) throws Exception {
            //1.准备环境 创建流环境 和 流表环境,并行度设置为1
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            //创建流表环境
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
            //2.Source 创建数据集
            DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                    new Order(1L, "beer", 3),
                    new Order(1L, "diaper", 4),
                    new Order(3L, "rubber", 2)));
    
            DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                    new Order(2L, "pen", 3),
                    new Order(2L, "rubber", 3),
                    new Order(4L, "beer", 1)));
            //3.注册表 将数据流转换成表
            // 通过fromDataStream将数据流转换成表
            Table orderTableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
            // 将数据流转换成 创建临时视图
            tEnv.createTemporaryView("orderTableB",orderB,$("user"), $("product"), $("amount"));
            //4.执行查询,查询order1的amount>2并union all 上 order2的amoun<2的数据生成表
            Table result = tEnv.sqlQuery("" +
                    "select * from " + orderTableA + " where amount>2 " +
                    "union all " +
                    "select * from orderTableB where amount<2");
            //4.1 将结果表转换成toAppendStream数据流
            //字段的名称和类型
            result.printSchema();
            DataStream<Row> resultDS = tEnv.toAppendStream(result, Row.class);
            //5.打印结果
            resultDS.print();
            //6.执行环境
            env.execute();
            // 创建实体类 user:Long product:String amount:int
        }
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class Order {
            public Long user;
            public String product;
            public int amount;
        }
    }
    
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。