2021年大数据Flink(二十):案例二 基于数量的滚动和滑动窗口
【摘要】
目录
案例二 基于数量的滚动和滑动窗口
需求
代码实现
案例二 基于数量的滚动和滑动窗口
需求
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗...
目录
案例二 基于数量的滚动和滑动窗口
需求
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
代码实现
-
package cn.itcast.window;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.api.common.functions.MapFunction;
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-
/**
-
* Author itcast
-
* Desc
-
* nc -lk 9999
-
* 有如下数据表示:
-
* 信号灯编号和通过该信号灯的车的数量
-
9,3
-
9,2
-
9,7
-
4,9
-
2,6
-
1,5
-
2,3
-
5,7
-
5,4
-
* 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
-
* 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
-
*/
-
public class WindowDemo02_CountWindow {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//2.Source
-
DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
-
-
//3.Transformation
-
//将9,3转为CartInfo(9,3)
-
SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
-
@Override
-
public CartInfo map(String value) throws Exception {
-
String[] arr = value.split(",");
-
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
-
}
-
});
-
-
//分组
-
//KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
-
-
// * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
-
//countWindow(long size, long slide)
-
SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
-
.keyBy(CartInfo::getSensorId)
-
//.countWindow(5L, 5L)
-
.countWindow( 5L)
-
.sum("count");
-
-
// * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
-
//countWindow(long size, long slide)
-
SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
-
.keyBy(CartInfo::getSensorId)
-
.countWindow(5L, 3L)
-
.sum("count");
-
-
-
//4.Sink
-
//result1.print();
-
/*
-
1,1
-
1,1
-
1,1
-
1,1
-
2,1
-
1,1
-
*/
-
result2.print();
-
/*
-
1,1
-
1,1
-
2,1
-
1,1
-
2,1
-
3,1
-
4,1
-
*/
-
-
//5.execute
-
env.execute();
-
}
-
@Data
-
@AllArgsConstructor
-
@NoArgsConstructor
-
public static class CartInfo {
-
private String sensorId;//信号灯id
-
private Integer count;//通过该信号灯的车的数量
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116278662
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)