实时即未来,大数据项目车联网之电子围栏模型使用翻滚窗口(21)
theme: smartblue
持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第21天,点击查看活动详情
1. 电子围栏模型使用翻滚窗口
l 创建90秒翻滚窗口,计算中电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)
n 设置电子围栏水印
n 根据vin进行分组
n 创建翻滚窗口,90秒为一个窗口周期
n 自定义窗口函数,计算电子围栏中和围栏外车辆信息
1.1 创建90秒翻滚窗口
l 电子围栏分析任务中第三大步执行后的第四大步:电子围栏规则模型流生成水印、分组、创建翻滚窗口、创建窗口函数
n 自定义水印:ElectricFenceWatermark
n 根据vin进行分组:electricFenceModel.getVin
n 设置窗口为90间隔的翻滚窗口
n 自定义窗口函数:ElectricFenceWindowFunction
//TODO 4.创建90秒翻滚窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置) DataStream<ElectricFenceModel> electricFenceDataStream = efModelDataStream .assignTimestampsAndWatermarks(new ElectricFenceWatermark()) .keyBy(ElectricFenceModel::getVin) .window(TumblingEventTimeWindows.of(Time.seconds(90))) .apply(new ElectricFenceWindowFunction()); electricFenceDataStream.print(“电子围栏结果>>>”); |
---|
// 4.1 自定义水印 解决相邻两条数据乱序问题 public class ElectricFenceWatermark implements AssignerWithPeriodicWatermarks<ElectricFenceModel>, Serializable { Long currentMaxTimestamp = 0L; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp); } @Override public long extractTimestamp(ElectricFenceModel element, long previousElementTimestamp) { currentMaxTimestamp = Math.max(element.getTerminalTimestamp(), currentMaxTimestamp); return element.getTerminalTimestamp(); } } |
1.2 自定义窗口函数
自定义窗口函数,实现对90秒翻滚窗口中的每一个窗口进行计算,计算出电子围栏模型数据结果
l 电子围栏模型计算逻辑分析:
n 对单个窗口中的车辆数据根据terminalTime进行排序
n 从redis中获取车辆vin对应的flag标记(车辆在围栏中或不在)
n 遍历DStream<ElectricFenceModel>的元素
n 对当前一个翻滚窗口中vin有多条数据,计算车辆在围栏中和围栏外的次数
u 单个车辆出现在围栏中的次数和单个车辆出现在围栏外的次数
u 根据每个vin和窗口间隔时间划分的单个窗口,相邻两条数据当前电子围栏状态,设置电子围栏模型对象
n 写入车辆是否在电子围栏标记到***state***中作为缓存,flag:0代表在围栏中;flag:1,代表不在围栏中
n 判断同一车辆在电子围栏中的flag之前是否存在与***state***中 且 判断flag是否成功写入state中
n 如果经纬度、gpsTime都不为空,继续设置电子模型对象
n 根据车辆最后一次的围栏状态和本次车辆的flag,设置电子围栏状态警告、进出围栏时间
u 如果redis缓存中lastStatusRValue(flag)为1,现在flag为0,即是进围栏,则设置电子围栏状态警告为进围栏(1),设置终端时间为进围栏时间,反之则为出围栏
l 自定义WindowFunction编程实现
/**
* 电子围栏自定义windowFunction,处理电子围栏判断逻辑
*/
public class ElectricFenceWindowFunction extends RichWindowFunction<ElectricFenceModel, ElectricFenceModel, String, TimeWindow> {
String stateStartWith = "electricFence_";
private MapState<String, Integer> state = null;
/**
* 初始化资源,只被执行一次
* @param parameters
* @throws Exception
*/
总结:
完成了实时分析车辆数据是否在电子围栏中,并得到分析后返回的DStream数据流:DataStream<ElectricFenceModel>;在电子围栏分析结果数据落地之前,要考虑到除第一个窗口之外的实时数据进来,需去查询结果表数据,合并数据之后再入库,如车辆已存在结果表中,需更新,不存在则插入。
- 点赞
- 收藏
- 关注作者
评论(0)