2021年大数据Flink(三十六):Table与SQL 案例三
【摘要】
目录
案例三
需求
编码步骤
代码实现-方式1
代码实现-方式2
案例三
需求
使用Flink SQL来统计5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额
也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额
上面的需求...
目录
案例三
需求
使用Flink SQL来统计5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额
也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额
上面的需求使用流处理的Window的基于时间的滚动窗口就可以搞定!
那么接下来使用FlinkTable&SQL-API来实现
编码步骤
1.创建环境
2.使用自定义函数模拟实时流数据
3.设置事件时间和Watermaker
4.注册表
5.执行sql-可以使用sql风格或table风格(了解)
6.输出结果
7.触发执行
代码实现-方式1
-
package cn.itcast.sql;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-
import org.apache.flink.api.java.tuple.Tuple2;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-
import org.apache.flink.table.api.Table;
-
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
import org.apache.flink.types.Row;
-
-
import java.time.Duration;
-
import java.util.Random;
-
import java.util.UUID;
-
import java.util.concurrent.TimeUnit;
-
-
import static org.apache.flink.table.api.Expressions.$;
-
-
/**
-
* Author itcast
-
* Desc
-
*/
-
public class FlinkSQL_Table_Demo04 {
-
public static void main(String[] args) throws Exception {
-
//1.准备环境
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-
//2.Source
-
DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
-
private Boolean isRunning = true;
-
@Override
-
public void run(SourceContext<Order> ctx) throws Exception {
-
Random random = new Random();
-
while (isRunning) {
-
Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
-
TimeUnit.SECONDS.sleep(1);
-
ctx.collect(order);
-
}
-
}
-
-
@Override
-
public void cancel() {
-
isRunning = false;
-
}
-
});
-
-
//3.Transformation
-
DataStream<Order> watermakerDS = orderDS
-
.assignTimestampsAndWatermarks(
-
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
-
.withTimestampAssigner((event, timestamp) -> event.getCreateTime())
-
);
-
-
//4.注册表
-
tEnv.createTemporaryView("t_order", watermakerDS,
-
$("orderId"), $("userId"), $("money"), $("createTime").rowtime());
-
-
//5.执行SQL
-
String sql = "select " +
-
"userId," +
-
"count(*) as totalCount," +
-
"max(money) as maxMoney," +
-
"min(money) as minMoney " +
-
"from t_order " +
-
"group by userId," +
-
"tumble(createTime, interval '5' second)";
-
-
Table ResultTable = tEnv.sqlQuery(sql);
-
-
//6.Sink
-
//将SQL的执行结果转换成DataStream再打印出来
-
//toAppendStream → 将计算后的数据append到结果DataStream中去
-
//toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
-
DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
-
resultDS.print();
-
-
env.execute();
-
-
}
-
-
@Data
-
@AllArgsConstructor
-
@NoArgsConstructor
-
public static class Order {
-
private String orderId;
-
private Integer userId;
-
private Integer money;
-
private Long createTime;
-
}
-
}
toAppendStream → 将计算后的数据append到结果DataStream中去
toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
代码实现-方式2
-
package cn.itcast.sql;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-
import org.apache.flink.api.java.tuple.Tuple2;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-
import org.apache.flink.table.api.Table;
-
import org.apache.flink.table.api.Tumble;
-
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
import org.apache.flink.types.Row;
-
-
import java.time.Duration;
-
import java.util.Random;
-
import java.util.UUID;
-
import java.util.concurrent.TimeUnit;
-
-
import static org.apache.flink.table.api.Expressions.$;
-
import static org.apache.flink.table.api.Expressions.lit;
-
-
/**
-
* Author itcast
-
* Desc
-
*/
-
public class FlinkSQL_Table_Demo05 {
-
public static void main(String[] args) throws Exception {
-
//1.准备环境
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-
//2.Source
-
DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
-
private Boolean isRunning = true;
-
-
@Override
-
public void run(SourceContext<Order> ctx) throws Exception {
-
Random random = new Random();
-
while (isRunning) {
-
Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
-
TimeUnit.SECONDS.sleep(1);
-
ctx.collect(order);
-
}
-
}
-
-
@Override
-
public void cancel() {
-
isRunning = false;
-
}
-
});
-
-
//3.Transformation
-
DataStream<Order> watermakerDS = orderDS
-
.assignTimestampsAndWatermarks(
-
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
-
.withTimestampAssigner((event, timestamp) -> event.getCreateTime())
-
);
-
-
//4.注册表
-
tEnv.createTemporaryView("t_order", watermakerDS,
-
$("orderId"), $("userId"), $("money"), $("createTime").rowtime());
-
-
//查看表约束
-
tEnv.from("t_order").printSchema();
-
-
//5.TableAPI查询
-
Table ResultTable = tEnv.from("t_order")
-
//.window(Tumble.over("5.second").on("createTime").as("tumbleWindow"))
-
.window(Tumble.over(lit(5).second())
-
.on($("createTime"))
-
.as("tumbleWindow"))
-
.groupBy($("tumbleWindow"), $("userId"))
-
.select(
-
$("userId"),
-
$("userId").count().as("totalCount"),
-
$("money").max().as("maxMoney"),
-
$("money").min().as("minMoney"));
-
-
-
//6.将SQL的执行结果转换成DataStream再打印出来
-
DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
-
resultDS.print();
-
-
//7.excute
-
env.execute();
-
}
-
-
@Data
-
@AllArgsConstructor
-
@NoArgsConstructor
-
public static class Order {
-
private String orderId;
-
private Integer userId;
-
private Integer money;
-
private Long createTime;
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116357554
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)