实时即未来,大数据项目车联网之电子围栏分析任务设置(19)

举报
Maynor学长 发表于 2022/10/31 12:36:47 2022/10/31
【摘要】 theme: smartblue持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第19天,点击查看活动详情 1. 电子围栏分析任务设置 1.1 电子围栏分析任务步骤分析电子围栏任务主要有8大步骤:电子围栏分析任务设置、原始数据json解析、过滤异常数据读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)原始车辆数据与电子围栏广播流进行合并,生成电子围栏规...

theme: smartblue

持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第19天,点击查看活动详情

1. 电子围栏分析任务设置

1.1 电子围栏分析任务步骤分析

image-20221025202221344

电子围栏任务主要有8大步骤:

电子围栏分析任务设置、原始数据json解析、过滤异常数据

读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)

原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream<ElectricFenceModel>)

创建90秒翻滚窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)

读取电子围栏分析结果表数据并广播

翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect

对电子围栏对象模型,添加uuid和inMySQL(车辆是否已存在mysql表中)

电子围栏分析结果数据落地mysql,也可以选择落地mongo

1.2 电子围栏分析任务实现

主类实现:ElectricFenceTask

img

  • 分析任务设置,设置流式环境、Checkpoint、kafka等
public class ElectricFenceTask extends BaseTask {
    public static void main(String[] args) throws Exceptio* {
        /**
         * 实现步骤:
         * 1.电子围栏分析任务设置、原始数据json解析、过滤异常数据
         * 2.读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)
         * 3.原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream<ElectricFenceModel>)
         * 4.创建90秒翻滚窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)
         * 5.读取电子围栏分析结果表数据并广播
         * 6.90秒翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
         * 7.对电子围栏对象模型,添加uuid和inMysql(车辆是否已存在mysql表中)
         * 8.电子围栏分析结果数据落地mysql|也可以选择落地mongo
         */
        //TODO 1.电子围栏分析任务设置、原始数据json解析、过滤异常数据
        StreamExecutionEnvironment env = getEnv(ElectricFenceTask.class.getSimpleName());

        //1.1:将kafka消费者实例添加到环境中
        DataStream<String> dataStreamSource = createKafkaStream(SimpleStringSchema.class);

        env.execute();
    }
}

简化为66个字段,包含主要字段

讲义关联资料\ItcastDataPartObj.java

简化后的对象对应的json解析工具类

讲义关联资料\JsonParsePartUtil.java

测试工具类对象

讲义关联资料\JsonPartParseUtilTest.java

  • 解析原始数据、过滤异常数据(第一步完成)
//1.2:将字符串转换成对象,过滤出来正常的数据
SingleOutputStreamOperator<ItcastDataPartObj> itcastJsonDataStream = dataStreamSource.map((MapFunction<String, ItcastDataPartObj>) obj -> {
    JsonParsePartUti> jsonParsePartUti> = new JsonParsePartUtil();
    retur* jsonParsePartUtil.parseJsonToObject(obj);
    //过滤出来异常数据,查询到正常数据返回
}).filter(itcastDataObj -> StringUtils.isEmpty(itcastDataObj.getErrorData()));
itcastJsonDataStream.print("原始数据>>>");
  • 读取已存在电子围栏中的车辆与电子围栏信息并形成广播流(第二大步)

1.添加Mysql电子围栏数据源

2.自定义source,实现加载电子围栏与电子围栏车辆数据

3.重写run方法,实现数据查询逻辑,并设置collect输出内容

4.重写cancel方法,实现资源释放,关闭连接

5.设置返回结果电子围栏与电子围栏车辆信息数据为广播流

1.3 广播状态与实现

从flink1.5版本开始,提供了新的流状态类型:广播状态(Broadcast)

什么是广播状态

  • broadcast state将一个流的某些数据广播到所有下游task,这些数据存储在本地并用于处理另一个流中的所有元素。简单的说,广播状态是可以联合处理两个事件流。

广播状态与其他操作状态不同的地方在于:

  • broadcast state 广播数据是map格式

  • 它适用于具有广播流和非广播流的特定操作

  • 特定操作具有以不同名称命名的多个广播状态

流状态查看官方说明:

img

  • Broadcasting DataStream → DataStream 广播DStream中的元素到每一个slot中,使用方法:dataStream.broadcast();

  • 主类中应用广播状态流

//TODO 2.读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)
DataStream<HashMap<String, ElectricFenceResultTmp>> electricFenceVinsStream = env.addSource(new MysqlElectricFenceSouce()).broadcast();
/**
 * 定义查询mysql数据库的电子围栏车辆表与电子围栏规则表的数据对应的javabean对象
 */
// 电子围栏转换临时对象
@Data
@AllArgsConstructor
public class ElectricFenceResultTmp {
    //电子围栏id
    private int id;
    //电子围栏名称
    private String name;
    //电子围栏中心地址
    private String address;
    //电子围栏半径
    private float radius;

    @Override
    public String toString() {
        retur* "ElectricFenceResultTmp{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", address='" + address + '\'' +
                ", radius=" + radius +
                ", longitude=" + longitude +
                ", latitude=" + latitude +
                ", startTime=" + startTime +
                ", endTime=" + endTime +
                '}';
    }

    //电子围栏中心点的经度
    private double longitude;
    //电子围栏中心点的维度
    private double latitude;
    //电子围栏的开始时间
    private Date startTime;
    //电子围栏的结束时间
    private Date endTime;
    // 省略get、set、有参构造方法
}
  • 自定义source实现
/**
 * 读取已经存在的电子围栏中的车辆和电子围栏规则表的数据
 * String:vin车架号(电子围栏中的车辆)
 * ElectricFenceResultTmp:电子围栏规则表的数据(电子围栏规则表)
 */
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。