实时即未来,大数据项目车联网之电子围栏中ConnectStreamed应用(20)

举报
Maynor学长 发表于 2022/10/31 12:37:13 2022/10/31
【摘要】 theme: smartblue持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第20天,点击查看活动详情 1. 电子围栏中ConnectStreamed应用 1.1 ConnectedStreams简介 1.1.1 connect流说明第二节学习了广播状态,广播状态把流数据广播到所有的下游task,当出现两种流数据需要进行合并与计算时,就需要用到到connec...

theme: smartblue

持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第20天,点击查看活动详情

1. 电子围栏中ConnectStreamed应用

1.1 ConnectedStreams简介

1.1.1 connect流说明

第二节学习了广播状态,广播状态把流数据广播到所有的下游task,当出现两种流数据需要进行合并与计算时,就需要用到到connect或者union

l connect operator

n 只能连接两个数据流

n 连接的两个数据流类型可以不一致

n 仅作用于DataStream

n 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态

l union operator

n 可以连接多个流

n 连接数据流的类型必须一致,否则报错

n 可以作用于DataStream,也可以作用于DataSet

l connect操作仅适用操作DataStream,它会将两个流中匹配的数据进行处理,不匹配不会进行处理,它会分别处理两个流,比join操作更加自由。它通常使用于处理一个BroadcastStream和另一个DataStream中的数据,这个BroadcastStream中的数据一般情况下不会改变,用于装载一些全局的配置文件。

1.1.2 connect流使用场景

connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上,如下图所示。控制流可以是阈值、规则、机器学习模型或其他参数。在电子围栏分析任务中,connect连接的是原始车辆数据流与电子围栏广播流

![image-20221029151454585](/Users/chinamanor/Library/Application Support/typora-user-images/image-20221029151454585.png)

注意:Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序。

l 电子围栏分析任务第三大步:

n 原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream<ElectricFenceModel>)

//TODO 3.原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream<ElectricFenceModel>) ConnectedStreams<ItcastDataPartObj, HashMap<String, ElectricFenceResultTmp>> electricFenceVinsConnectStream = itcastJsonDataStream.connect(electricFenceVinsStream); SingleOutputStreamOperator<ElectricFenceModel> efModelDataStream = electricFenceVinsConnectStream.flatMap( new ElectricFenceRulesFunction()); efModelDataStream.printToErr(“电子围栏车辆规则与原始数据关联后的数据>>>”);
// 电子围栏规则计算模型 @Data @AllArgsConstructor public class ElectricFenceModel implements Comparable<ElectricFenceModel>{ //车架号 private String vin = “”; //电子围栏结果表UUID private Long uuid = -999999L; //上次状态 0 里面 1 外面 private int lastStatus = -999999; //当前状态 0 里面 1 外面 private int nowStatus = -999999; //位置时间 private String gpsTime = “”; //位置纬度-- private Double lat = -999999D; //位置经度-- private Double lng = -999999D; //电子围栏ID private int eleId = -999999; //电子围栏名称 private String eleName = “”; //中心点地址 private String address = “”; //中心点纬度 private Double latitude; //中心点经度 private Double longitude = -999999D; //电子围栏半径 private Float radius = -999999F; //出围栏时间 private String outEleTime = null; //进围栏时间 private String inEleTime = null; //是否在mysql结果表中 private Boolean inMysql = false; //状态报警 0:出围栏 1:进围栏 private int statusAlarm = -999999; //报警信息 private String statusAlarmMsg = “”; //终端时间 private String terminalTime = “”; // 扩展字段 终端时间 private Long terminalTimestamp = -999999L; public ElectricFenceModel(){ } @Override public int compareTo(ElectricFenceModel o) { if(this.getTerminalTimestamp() > o.getTerminalTimestamp()){ return 1; }else if(this.getTerminalTimestamp() < o.getTerminalTimestamp()){ return -1; } else { return 0; } } // 省略get、set、toString方法 }

1.2 Broadcast+Connect+CoFlatmap+CoMap整合实战

l CoFlatMap应用说明

l CoMap, CoFlatMap ConnectedStreams → DataStream 类似于连接数据流上的map和flatMap

l 用法:

n CoMap

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; }});

n CoFlatMap

connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } }});

l 整合输出流数据实战:Broadcast、Connect、CoFlatmap、CoMap

n 在测试目录下创建任务:TestDstreamOperatorTask

img

n 设置流式任务

n 分别创建测试数据流和广播流

n connect两个流,并进行flatMap操作

n 自定义CoFlatMapFunction函数对象:DstreamOperatorFlatMap,实现CoFlatMapFunction<IN1, IN2, OUT>接口

n 使用map方法替换flatMap方法,查看效果

1.3 两点之间球面距离计算

我们使用的外卖APP,美团、饿了么等平台,都可以根据距离进排序,可以看到商家距离我们的大致位置,这种基于位置的服务统一被称为LBS(Location Based Service)。

img

l 已知经纬度的两点之间计算直线距离,由于地球是椭圆的球体,计算的时候比较麻烦,因此计算时是将作为球体计算的。也可以说在地球上计算两点之间的直线距离,实际上是计算的球面距离;

img

l 计算两点之间的球面具体有几下几种方法:

n 编写代码实现Great-circle distance算法(大量使用余弦函数,对于短距离,误差大)

img

n 编写代码实现Haversine formula算法(大量采用正弦函数,对于短距离,误差小)

img

n 使用第三方jar计算(准确、简单)

<dependency> <groupId>org.gavaghan</groupId> <artifactId>geodesy</artifactId> <version>${geodesy.version}</version></dependency>

l 球面距离计算工具类:DistanceCaculateUtil

n 编写计算距离方法getDistanceMeter

n 传入两点经纬度计算距离方法ellipsoidMethodDistance

n 使用椭圆计算方法ellipsoidMethodDistance:计算参数(两点的经纬度、Ellipsoid.Sphere)

l 距离计算工具类测试

img

public class TestDistanceCaculateUtil { public static void main(String[] args) {// DistanceCaculateUtil.getDistance(null,null, null, null); // https://lbs.amap.com/api/javascript-api/example/calcutation/calculate-distance-between-two-markers Double distance = DistanceCaculateUtil.getDistance(39.923423, 116.368904, 39.922501, 116.387271); System.out.println(distance); }}

1.4 电子围栏中自定义对象实现CoFlatMap函数

l 自定义CoFlatMap函数

/
 * 自定义coFlatMapFunction的函数对象,实现原始车辆和电子围栏广播流数据的合并,返回电子围栏规则模型流数据
 */
public class ElectricFenceRulesFunction implements CoFlatMapFunction<ItcastDataPartObj, HashMap<String, ElectricFenceResultTmp>, ElectricFenceModel> {
    //定义电子围栏广播流数据的对象
    HashMap<String, ElectricFenceResultTmp> vehicalInfoMap = new HashMap<>();

    /
     * 作用于itcastJsonDataStream流的flatmap操作
     * @param dataPartObj
     * @param collector
     * @throws Exception
     */
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。