2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口
【摘要】
目录
案例一 基于时间的滚动和滑动窗口
需求
代码实现
案例一 基于时间的滚动和滑动窗口
需求
nc -lk 9999
有如下数据表示:
信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红...
目录
案例一 基于时间的滚动和滑动窗口
需求
nc -lk 9999
有如下数据表示:
信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
代码实现
-
package cn.itcast.window;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.api.common.functions.MapFunction;
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-
import org.apache.flink.streaming.api.windowing.time.Time;
-
-
/**
-
* Author itcast
-
* Desc
-
* nc -lk 9999
-
* 有如下数据表示:
-
* 信号灯编号和通过该信号灯的车的数量
-
9,3
-
9,2
-
9,7
-
4,9
-
2,6
-
1,5
-
2,3
-
5,7
-
5,4
-
* 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
-
* 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
-
*/
-
public class WindowDemo01_TimeWindow {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//2.Source
-
DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
-
-
//3.Transformation
-
//将9,3转为CartInfo(9,3)
-
SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
-
@Override
-
public CartInfo map(String value) throws Exception {
-
String[] arr = value.split(",");
-
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
-
}
-
});
-
-
//分组
-
//KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
-
-
// * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口
-
//timeWindow(Time size窗口大小, Time slide滑动间隔)
-
SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
-
.keyBy(CartInfo::getSensorId)
-
//.timeWindow(Time.seconds(5))//当size==slide,可以只写一个
-
//.timeWindow(Time.seconds(5), Time.seconds(5))
-
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-
.sum("count");
-
-
// * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口
-
SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
-
.keyBy(CartInfo::getSensorId)
-
//.timeWindow(Time.seconds(10), Time.seconds(5))
-
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
-
.sum("count");
-
-
//4.Sink
-
/*
-
1,5
-
2,5
-
3,5
-
4,5
-
*/
-
//result1.print();
-
result2.print();
-
-
//5.execute
-
env.execute();
-
}
-
@Data
-
@AllArgsConstructor
-
@NoArgsConstructor
-
public static class CartInfo {
-
private String sensorId;//信号灯id
-
private Integer count;//通过该信号灯的车的数量
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116247545
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)