2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

举报
Lansonli 发表于 2021/09/28 23:00:46 2021/09/28
【摘要】 目录 API 获取环境 创建表 查询表 Table API SQL ​​​​​​​写出表 ​​​​​​​与DataSet/DataStream集成 ​​​​​​​TableAPI ​​​​​​​SQLAPI API 获取环境 https://ci.apache.org/projects/flink/flink...

目录

API

获取环境

创建表

查询表

Table API

SQL

​​​​​​​写出表

​​​​​​​与DataSet/DataStream集成

​​​​​​​TableAPI

​​​​​​​SQLAPI


API

获取环境

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment

 


  
  1. // **********************
  2. // FLINK STREAMING QUERY
  3. // **********************
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.EnvironmentSettings;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
  8. StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  9. StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
  10. // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
  11. // ******************
  12. // FLINK BATCH QUERY
  13. // ******************
  14. import org.apache.flink.api.java.ExecutionEnvironment;
  15. import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
  16. ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
  17. BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
  18. // **********************
  19. // BLINK STREAMING QUERY
  20. // **********************
  21. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  22. import org.apache.flink.table.api.EnvironmentSettings;
  23. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  24. StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  25. EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  26. StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
  27. // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
  28. // ******************
  29. // BLINK BATCH QUERY
  30. // ******************
  31. import org.apache.flink.table.api.EnvironmentSettings;
  32. import org.apache.flink.table.api.TableEnvironment;
  33. EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
  34. TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

 

​​​​​​​创建表


  
  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // table is the result of a simple projection query
  4. Table projTable = tableEnv.from("X").select(...);
  5. // register the Table projTable as table "projectedTable"
  6. tableEnv.createTemporaryView("projectedTable", projTable);

  
  1. tableEnvironment
  2.   .connect(...)
  3.   .withFormat(...)
  4.   .withSchema(...)
  5.   .inAppendMode()
  6.   .createTemporaryTable("MyTable")

 

​​​​​​​查询表

Table API


  
  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // register Orders table
  4. // scan registered Orders table
  5. Table orders = tableEnv.from("Orders");// compute revenue for all customers from France
  6. Table revenue = orders
  7.   .filter($("cCountry")
  8. .isEqual("FRANCE"))
  9.   .groupBy($("cID"), $("cName")
  10.   .select($("cID"), $("cName"), $("revenue")
  11. .sum()
  12. .as("revSum"));
  13. // emit or convert Table
  14. // execute query

SQL


  
  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // register Orders table
  4. // compute revenue for all customers from France
  5. Table revenue = tableEnv.sqlQuery(
  6.     "SELECT cID, cName, SUM(revenue) AS revSum " +
  7.     "FROM Orders " +
  8.     "WHERE cCountry = 'FRANCE' " +
  9.     "GROUP BY cID, cName"
  10.   );
  11. // emit or convert Table
  12. // execute query

  
  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // register "Orders" table
  4. // register "RevenueFrance" output table
  5. // compute revenue for all customers from France and emit to "RevenueFrance"
  6. tableEnv.executeSql(
  7.     "INSERT INTO RevenueFrance " +
  8.     "SELECT cID, cName, SUM(revenue) AS revSum " +
  9.     "FROM Orders " +
  10.     "WHERE cCountry = 'FRANCE' " +
  11.     "GROUP BY cID, cName"
  12.   );

 

​​​​​​​写出表


  
  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // create an output Table
  4. final Schema schema = new Schema()
  5.     .field("a", DataTypes.INT())
  6.     .field("b", DataTypes.STRING())
  7.     .field("c", DataTypes.BIGINT());
  8. tableEnv.connect(new FileSystem().path("/path/to/file"))
  9.     .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
  10.     .withSchema(schema)
  11.     .createTemporaryTable("CsvSinkTable");
  12. // compute a result Table using Table API operators and/or SQL queries
  13. Table result = ...
  14. // emit the result Table to the registered TableSink
  15. result.executeInsert("CsvSinkTable");

 

​​​​​​​与DataSet/DataStream集成

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api

 

 

  • Create a View from a DataStream or DataSet

  
  1. // get StreamTableEnvironment
  2. // registration of a DataSet in a BatchTableEnvironment is equivalent
  3. StreamTableEnvironment tableEnv = ...; 
  4. // see "Create a TableEnvironment" section
  5. DataStream<Tuple2<Long, String>> stream = ...
  6. // register the DataStream as View "myTable" with fields "f0", "f1"
  7. tableEnv.createTemporaryView("myTable", stream);
  8. // register the DataStream as View "myTable2" with fields "myLong", "myString"
  9. tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
  • Convert a DataStream or DataSet into a Table
    
        
    1. // get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalent
    2. StreamTableEnvironment tableEnv = ...; 
    3. // see "Create a TableEnvironment" section
    4. DataStream<Tuple2<Long, String>> stream = ...
    5. // Convert the DataStream into a Table with default fields "f0", "f1"
    6. Table table1 = tableEnv.fromDataStream(stream);
    7. // Convert the DataStream into a Table with fields "myLong", "myString"
    8. Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));

     

  • Convert a Table into a DataStream or DataSet

Convert a Table into a DataStream

Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.

追加模式:只有当动态表仅通过插入更改进行修改时,才能使用此模式,即,它是仅追加模式,并且以前发出的结果从不更新。

Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.

撤回模式:此模式始终可用。它使用布尔标志对插入和删除更改进行编码。


  
  1. // get StreamTableEnvironment.
  2. StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // Table with two fields (String name, Integer age)
  4. Table table = ...
  5. // convert the Table into an append DataStream of Row by specifying the class
  6. DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
  7. // convert the Table into an append DataStream of Tuple2<String, Integer>
  8.  //   via a TypeInformation
  9. TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  10.   Types.STRING(),
  11.   Types.INT());
  12. DataStream<Tuple2<String, Integer>> dsTuple = 
  13.   tableEnv.toAppendStream(table, tupleType);
  14. // convert the Table into a retract DataStream of Row.
  15. //   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
  16. //   The boolean field indicates the type of the change.
  17. //   True is INSERT, false is DELETE.
  18. DataStream<Tuple2<Boolean, Row>> retractStream = 
  19.   tableEnv.toRetractStream(table, Row.class);

Convert a Table into a DataSet


  
  1. // get BatchTableEnvironment
  2. BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
  3. // Table with two fields (String name, Integer age)
  4. Table table = ...
  5. // convert the Table into a DataSet of Row by specifying a class
  6. DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
  7. // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<StringInteger>> tupleType = new TupleTypeInfo<>(
  8.   Types.STRING(),
  9.   Types.INT());
  10. DataSet<Tuple2<StringInteger>> dsTuple = 
  11.   tableEnv.toDataSet(table, tupleType);

 

​​​​​​​TableAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

 

​​​​​​​SQLAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116327559

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。