2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

举报
Lansonli 发表于 2021/09/28 23:00:46 2021/09/28
2k+ 0 0
【摘要】 目录 API 获取环境 创建表 查询表 Table API SQL ​​​​​​​写出表 ​​​​​​​与DataSet/DataStream集成 ​​​​​​​TableAPI ​​​​​​​SQLAPI API 获取环境 https://ci.apache.org/projects/flink/flink...

目录

API

获取环境

创建表

查询表

Table API

SQL

​​​​​​​写出表

​​​​​​​与DataSet/DataStream集成

​​​​​​​TableAPI

​​​​​​​SQLAPI


API

获取环境

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment


      // **********************
      // FLINK STREAMING QUERY
      // **********************
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.EnvironmentSettings;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
      StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
      // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
      // ******************
      // FLINK BATCH QUERY
      // ******************
      import org.apache.flink.api.java.ExecutionEnvironment;
      import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
      ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
      BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
      // **********************
      // BLINK STREAMING QUERY
      // **********************
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.EnvironmentSettings;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
      StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
      // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
      // ******************
      // BLINK BATCH QUERY
      // ******************
      import org.apache.flink.table.api.EnvironmentSettings;
      import org.apache.flink.table.api.TableEnvironment;
      EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
      TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
  
 

​​​​​​​创建表


      // get a TableEnvironment
      TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
      // table is the result of a simple projection query
      Table projTable = tableEnv.from("X").select(...);
      // register the Table projTable as table "projectedTable"
      tableEnv.createTemporaryView("projectedTable", projTable);
  
 

      tableEnvironment
        .connect(...)
        .withFormat(...)
        .withSchema(...)
        .inAppendMode()
        .createTemporaryTable("MyTable")
  
 

​​​​​​​查询表

Table API


      // get a TableEnvironment
      TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
      // register Orders table
      // scan registered Orders table
      Table orders = tableEnv.from("Orders");// compute revenue for all customers from France
      Table revenue = orders
        .filter($("cCountry")
      .isEqual("FRANCE"))
        .groupBy($("cID"), $("cName")
        .select($("cID"), $("cName"), $("revenue")
      .sum()
      .as("revSum"));
      // emit or convert Table
      // execute query
  
 

SQL


      // get a TableEnvironment
      TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
      // register Orders table
      // compute revenue for all customers from France
      Table revenue = tableEnv.sqlQuery(
          "SELECT cID, cName, SUM(revenue) AS revSum " +
          "FROM Orders " +
          "WHERE cCountry = 'FRANCE' " +
          "GROUP BY cID, cName"
        );
      // emit or convert Table
      // execute query
  
 

      // get a TableEnvironment
      TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
      // register "Orders" table
      // register "RevenueFrance" output table
      // compute revenue for all customers from France and emit to "RevenueFrance"
      tableEnv.executeSql(
          "INSERT INTO RevenueFrance " +
          "SELECT cID, cName, SUM(revenue) AS revSum " +
          "FROM Orders " +
          "WHERE cCountry = 'FRANCE' " +
          "GROUP BY cID, cName"
        );
  
 

​​​​​​​写出表


      // get a TableEnvironment
      TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
      // create an output Table
      final Schema schema = new Schema()
          .field("a", DataTypes.INT())
          .field("b", DataTypes.STRING())
          .field("c", DataTypes.BIGINT());
      tableEnv.connect(new FileSystem().path("/path/to/file"))
          .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
          .withSchema(schema)
          .createTemporaryTable("CsvSinkTable");
      // compute a result Table using Table API operators and/or SQL queries
      Table result = ...
      // emit the result Table to the registered TableSink
      result.executeInsert("CsvSinkTable");
  
 

​​​​​​​与DataSet/DataStream集成

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api

  • Create a View from a DataStream or DataSet

      // get StreamTableEnvironment
      // registration of a DataSet in a BatchTableEnvironment is equivalent
      StreamTableEnvironment tableEnv = ...;
      // see "Create a TableEnvironment" section
      DataStream<Tuple2<Long, String>> stream = ...
      // register the DataStream as View "myTable" with fields "f0", "f1"
      tableEnv.createTemporaryView("myTable", stream);
      // register the DataStream as View "myTable2" with fields "myLong", "myString"
      tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
  
 
  • Convert a DataStream or DataSet into a Table
    
            // get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalent
            StreamTableEnvironment tableEnv = ...;
            // see "Create a TableEnvironment" section
            DataStream<Tuple2<Long, String>> stream = ...
            // Convert the DataStream into a Table with default fields "f0", "f1"
            Table table1 = tableEnv.fromDataStream(stream);
            // Convert the DataStream into a Table with fields "myLong", "myString"
            Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
        
       
  • Convert a Table into a DataStream or DataSet

Convert a Table into a DataStream

Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.

追加模式:只有当动态表仅通过插入更改进行修改时,才能使用此模式,即,它是仅追加模式,并且以前发出的结果从不更新。

Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.

撤回模式:此模式始终可用。它使用布尔标志对插入和删除更改进行编码。


      // get StreamTableEnvironment.
      StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
      // Table with two fields (String name, Integer age)
      Table table = ...
      // convert the Table into an append DataStream of Row by specifying the class
      DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
      // convert the Table into an append DataStream of Tuple2<String, Integer>
       //   via a TypeInformation
      TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
        Types.STRING(),
        Types.INT());
      DataStream<Tuple2<String, Integer>> dsTuple =
        tableEnv.toAppendStream(table, tupleType);
      // convert the Table into a retract DataStream of Row.
      //   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
      //   The boolean field indicates the type of the change.
      //   True is INSERT, false is DELETE.
      DataStream<Tuple2<Boolean, Row>> retractStream =
        tableEnv.toRetractStream(table, Row.class);
  
 

Convert a Table into a DataSet


      // get BatchTableEnvironment
      BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
      // Table with two fields (String name, Integer age)
      Table table = ...
      // convert the Table into a DataSet of Row by specifying a class
      DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
      // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
        Types.STRING(),
        Types.INT());
      DataSet<Tuple2<String, Integer>> dsTuple =
        tableEnv.toDataSet(table, tupleType);
  
 

​​​​​​​TableAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

​​​​​​​SQLAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116327559

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。