实时即未来,大数据项目车联网之合并分析电子围栏结果(22)
【摘要】 1. 合并分析电子围栏结果 1.1 读取电子围栏分析结果表数据并广播l 读取电子围栏分析结果表:electric_fence,并广播传递给下一个任务节点l 自定义source,继承RichSourceFunction对象,重写run、cancel方法l 获得已存在结果表中的车辆和最早出现在结果表中的id,并广播结果流l 主类实现电子围栏分析第5大步//TODO 5.读取电子围栏分析结果...
1. 合并分析电子围栏结果
1.1 读取电子围栏分析结果表数据并广播
l 读取电子围栏分析结果表:electric_fence,并广播传递给下一个任务节点
l 自定义source,继承RichSourceFunction对象,重写run、cancel方法
l 获得已存在结果表中的车辆和最早出现在结果表中的id,并广播结果流
l 主类实现电子围栏分析第5大步
//TODO 5.读取电子围栏分析结果表数据并广播
DataStream vehicleInfoStream = env.addSource(new MysqlVehicleInfoSource()).broadcast();
l 自定义source:MysqlVehicleInfoSource
/**
* @Description:自定义source
*/
public class MysqlVehicleInfoSource extends RichSourceFunction<HashMap<String, Long>> {
private Logger logger = LoggerFactory.getLogger("MysqlVehicleInfoSource");
ParameterTool globalJobParameters;
Connection conn = null;
PreparedStatement pstmt = null;
boolean isRunning = true;
HashMap vehInfoMap = new HashMap<String, Long>();
1.2 窗口流数据与广播流数据连接
电子围栏分析第6大步:90秒翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
//TODO 6.将电子围栏分析结果表数据与电子围栏规则模型流数据进行合并 ConnectedStreams connectedStreams = electricFenceDataStream.connect(vehicleInfoStream);
1.3 判断窗口中车辆是否已存在Mysql结果表中
l 电子围栏分析第7大步:对电子围栏对象模型,添加uuid和inMysql
n 车辆是否已存在mysql表中,已存在则设置电子围栏id为uuid,设置当前车辆是否在mysql中为true
n 不存在mysql中,则设置uuid为Long的最大值减去当前时间(不重复的随机值)
n 第七步主类实现
//TODO 7.对电子围栏对象模型,添加uuid和inMysql(车辆是否已存在mysql表中) DataStream<ElectricFenceModel> outputStream = connectedStreams.flatMap(new ElectricFenceModelFunction());
n 自定义CoFlatMapFunction
/**
* @Description:自定义CoFlatMapFunction
*/
public class ElectricFenceModelFunction implements CoFlatMapFunction<ElectricFenceModel, HashMap<String, Long>, ElectricFenceModel> {
Logger logger = LoggerFactory.getLogger(ElectricFenceModelFunction.class);
HashMap vehInfo = new HashMap<String, Long>();
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)