2021年大数据Flink(三十二):Table与SQL案例准备 API
目录
API
获取环境
-
// **********************
-
-
// FLINK STREAMING QUERY
-
-
// **********************
-
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-
import org.apache.flink.table.api.EnvironmentSettings;
-
-
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-
-
-
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
-
-
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
-
-
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
-
-
-
-
// ******************
-
-
// FLINK BATCH QUERY
-
-
// ******************
-
-
import org.apache.flink.api.java.ExecutionEnvironment;
-
-
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
-
-
-
-
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
-
-
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
-
-
-
-
// **********************
-
-
// BLINK STREAMING QUERY
-
-
// **********************
-
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-
import org.apache.flink.table.api.EnvironmentSettings;
-
-
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-
-
-
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
-
-
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
-
-
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
-
-
-
-
// ******************
-
-
// BLINK BATCH QUERY
-
-
// ******************
-
-
import org.apache.flink.table.api.EnvironmentSettings;
-
-
import org.apache.flink.table.api.TableEnvironment;
-
-
-
-
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
-
-
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
创建表
-
// get a TableEnvironment
-
-
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-
-
-
-
// table is the result of a simple projection query
-
-
Table projTable = tableEnv.from("X").select(...);
-
-
-
-
// register the Table projTable as table "projectedTable"
-
-
tableEnv.createTemporaryView("projectedTable", projTable);
-
tableEnvironment
-
-
.connect(...)
-
-
.withFormat(...)
-
-
.withSchema(...)
-
-
.inAppendMode()
-
-
.createTemporaryTable("MyTable")
查询表
Table API
-
// get a TableEnvironment
-
-
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-
-
// register Orders table
-
-
// scan registered Orders table
-
-
Table orders = tableEnv.from("Orders");// compute revenue for all customers from France
-
-
Table revenue = orders
-
-
.filter($("cCountry")
-
-
.isEqual("FRANCE"))
-
-
.groupBy($("cID"), $("cName")
-
-
.select($("cID"), $("cName"), $("revenue")
-
-
.sum()
-
-
.as("revSum"));
-
-
// emit or convert Table
-
-
// execute query
SQL
-
// get a TableEnvironment
-
-
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-
-
-
-
// register Orders table
-
-
// compute revenue for all customers from France
-
-
Table revenue = tableEnv.sqlQuery(
-
-
"SELECT cID, cName, SUM(revenue) AS revSum " +
-
-
"FROM Orders " +
-
-
"WHERE cCountry = 'FRANCE' " +
-
-
"GROUP BY cID, cName"
-
-
);
-
-
// emit or convert Table
-
-
// execute query
-
-
// get a TableEnvironment
-
-
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-
-
-
-
// register "Orders" table
-
-
// register "RevenueFrance" output table
-
-
// compute revenue for all customers from France and emit to "RevenueFrance"
-
-
tableEnv.executeSql(
-
-
"INSERT INTO RevenueFrance " +
-
-
"SELECT cID, cName, SUM(revenue) AS revSum " +
-
-
"FROM Orders " +
-
-
"WHERE cCountry = 'FRANCE' " +
-
-
"GROUP BY cID, cName"
-
-
);
写出表
-
// get a TableEnvironment
-
-
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-
-
// create an output Table
-
-
final Schema schema = new Schema()
-
-
.field("a", DataTypes.INT())
-
-
.field("b", DataTypes.STRING())
-
-
.field("c", DataTypes.BIGINT());
-
-
tableEnv.connect(new FileSystem().path("/path/to/file"))
-
-
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
-
-
.withSchema(schema)
-
-
.createTemporaryTable("CsvSinkTable");
-
-
// compute a result Table using Table API operators and/or SQL queries
-
-
Table result = ...
-
-
// emit the result Table to the registered TableSink
-
-
result.executeInsert("CsvSinkTable");
与DataSet/DataStream集成
- Create a View from a DataStream or DataSet
-
-
// get StreamTableEnvironment
-
-
// registration of a DataSet in a BatchTableEnvironment is equivalent
-
-
StreamTableEnvironment tableEnv = ...;
-
-
-
-
// see "Create a TableEnvironment" section
-
-
DataStream<Tuple2<Long, String>> stream = ...
-
-
-
-
// register the DataStream as View "myTable" with fields "f0", "f1"
-
-
tableEnv.createTemporaryView("myTable", stream);
-
-
-
-
// register the DataStream as View "myTable2" with fields "myLong", "myString"
-
-
tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
- Convert a DataStream or DataSet into a Table
-
-
// get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalent
-
-
StreamTableEnvironment tableEnv = ...;
-
-
// see "Create a TableEnvironment" section
-
-
-
-
DataStream<Tuple2<Long, String>> stream = ...
-
-
// Convert the DataStream into a Table with default fields "f0", "f1"
-
-
-
-
Table table1 = tableEnv.fromDataStream(stream);
-
-
// Convert the DataStream into a Table with fields "myLong", "myString"
-
-
Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
-
-
- Convert a Table into a DataStream or DataSet
Convert a Table into a DataStream
Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.
追加模式:只有当动态表仅通过插入更改进行修改时,才能使用此模式,即,它是仅追加模式,并且以前发出的结果从不更新。
Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.
撤回模式:此模式始终可用。它使用布尔标志对插入和删除更改进行编码。
-
// get StreamTableEnvironment.
-
-
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
-
-
-
-
// Table with two fields (String name, Integer age)
-
-
Table table = ...
-
-
-
-
// convert the Table into an append DataStream of Row by specifying the class
-
-
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
-
-
-
-
// convert the Table into an append DataStream of Tuple2<String, Integer>
-
-
// via a TypeInformation
-
-
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
-
-
Types.STRING(),
-
-
Types.INT());
-
-
DataStream<Tuple2<String, Integer>> dsTuple =
-
-
tableEnv.toAppendStream(table, tupleType);
-
-
// convert the Table into a retract DataStream of Row.
-
-
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
-
-
// The boolean field indicates the type of the change.
-
-
// True is INSERT, false is DELETE.
-
-
DataStream<Tuple2<Boolean, Row>> retractStream =
-
-
tableEnv.toRetractStream(table, Row.class);
Convert a Table into a DataSet
-
// get BatchTableEnvironment
-
-
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
-
-
-
-
// Table with two fields (String name, Integer age)
-
-
Table table = ...
-
-
-
-
// convert the Table into a DataSet of Row by specifying a class
-
-
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
-
-
-
-
// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
-
-
Types.STRING(),
-
-
Types.INT());
-
-
DataSet<Tuple2<String, Integer>> dsTuple =
-
-
tableEnv.toDataSet(table, tupleType);
TableAPI
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
SQLAPI
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116327559
- 点赞
- 收藏
- 关注作者
评论(0)