基于FusionInsight开发智能搜车系统

举报
suwei 发表于 2020/08/27 16:11:11 2020/08/27
【摘要】 基于FusionInsight平台开发搜车系统,用HBase存储历史海量日志,基于ES做查询

一、背景

1.1、业务背景

XX交警智能搜车系统,通过各卡口摄像头采集每天过往的车辆信息,存入数据库。交警人员在页面上可根据灵活条件进行车辆信息查询、违章查询、或者描绘车辆轨迹。

卡口数据包括两种:1、以csv格式保存的历史文本数据;2、实时卡口数据

项目需求:

1.         根据精确车牌号查出车辆信息

2.         根据模糊车牌号查出车辆信息

3.         查出没有系安全带的违法车辆信息

4.         查处符合某时间规则的车辆信息

5.         根据车辆颜色和品牌查出车辆信息

1.2、平台选型

FI选型使用 HD8.0版本,对应Spark版本2.4.5,ES版本7.6.0,HBase版本2.2.3

二、方案设计

2.1、数据模型

车辆详细信息字段有上百个,全量保存在HBase表中,如下是抽取出的部分需做索引的字段,存储到ES中

字段 样例数据 数据类型 说明
id 4p9tqo41ksu7 string 随机字符串,唯一标记
checkpoint_id A21001040 string 卡口编号
time 2019-03-01 10:57:46 string 采集时间
plate_no 粤123456 string 车牌号
plate_color 1 int 车牌号颜色标记: 白、灰、黑、红、紫、蓝、黄、绿、青、棕、粉红(0-10
vehicle_color 1 int 车辆颜色: 白、灰、黑、红、紫、蓝、黄、绿、青、棕、粉红(0-10
vehicle_type SUV string 车辆类型
vehicle_brand 本田 string 车辆品牌
smoking 1 int 是否抽烟 1:是  0:否
safebelt 1 int 是否系安全带 1:是  0:否

2.2、数据流向

image.png

2.3、HBase表设计

1、避免单表数据量过大,以月为单位建表

2、以单条记录唯一主键 id 作为rowkey

三、历史数据处理

1、历史数据通过bulkload方式导入HBase,具体步骤可参考帖子  https://bbs.huaweicloud.com/forum/thread-66116-1-1.html

2、导入HBase的数据再导入ES,可通过HBase2ES迁移工具,具体可参考 https://bbs.huaweicloud.com/forum/thread-71359-1-1.html

四、实时数据流

4.1、模拟数据流生成

我们通过python脚本(见附件)定时生成数据文件,定时放到指定目录,例如/var/log/realtimeLog

脚本data_gen.py存放于部署有flume客户端的节点的/opt/test目录下,执行命令为

nohup python data_gen.py  -i  /opt/test  -o /var/log/realtimeLog &

生成的样例数据如图:

image.png

4.2、配置Flume

我们这里直接将数据文件通过Flume发往Kafka,使用Kafka普通模式端口,检查kafka配置 allow.everyone.if.no.acl.found为true

Flume配置文件通过Manager页面上的Flume配置工具生成,如图:

image.png

Flume客户端下载后,解压缩,得到安装脚本,路径为:

/tmp/flume-Client/FusionInsight_Cluster_1_Flume_ClientConfig/Flume/FlumeClient

通过该路径下的install.sh 安装flume客户端

./install.sh -d /opt/realtimeFlume -c /opt/test/properties.properties -f 10.244.230.213 -n realtimeTest

其中-d为安装目录,-c为指定的上面生成的配置文件,-f为Flume Monitor实例的一个IP,-n为客户端指定的名称

安装完毕后,启动测试数据生成脚本

通过kafka脚本验证数据是否有生成:

image.png

4.3、SparkStreaming读取kafka数据

该部分详细可参考产品文档《应用开发指南》-《安全模式》-《Spark2x开发指南》-《开发程序》-《SparkStreaming对接Kafka0-10程序》-《Java样例代码》部分

image.png

4.4、SparkStreaming写入HBase和ES

获取到 微批 JavaStreamingContext,通过foreachRDD写入HBase和ES

HBase主要调用HBase的API,ES调用的是low level rest Client接口,相关代码可参考附件

image.png

五、打包

建议不要将依赖打成一个包,如果有版本更改,可方便后面替换依赖包

image.png

通过 Artifacts 打包如图

image.png

可能碰到如下错误,是因为没有引入scala的SDK

image.png

在Global Libraries中引入scala的SDK即可

image.png

六、配置客户端

因为Kafka的认证信息是通过JAAS认证机制,通过Spark自带的—keytab自己无法解决认证对接的问题,需要单独处理。

Spark中driver和executor默认分别使用客户端/Spark2x/spark/conf目录下的jaas.conf(driver端)和jaas-zk.conf(executor端)进行认证。

1、jaas.conf文件内容参考,注意keyTab指定的是提交任务的节点上存放user.keytab的绝对路径


Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/opt/client/Spark2x/spark/user.keytab"
principal="fwc"
useTicketCache=false
storeKey=true
debug=true;
};

2、jaas-zk.conf文件内容参考,这里keyTab是上传到executor后的user.keytab的路径,使用相对路径

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./user.keytab"
principal="fwc"
useTicketCache=false
storeKey=true
debug=true;
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./user.keytab"
principal="fwc"
useTicketCache=false
storeKey=true
debug=true;
};

3、将连接HBase的依赖配置文件拷贝到客户端/Spark2x/spark/conf目录下

包括HBase客户端下的 core-site.xml、hdfs-site.xml和hbase-site.xml

4、创建提交任务的临时目录,例如/opt/sparkTest,如图,将依赖的jar包放到该目录下的lib目录下,创建启动任务的submit.sh

image.png

该目录下的连接ES的配置文件为 esParams.properties,该文件参数参考附件

七、提交任务查看执行结果

7.1、提交spark任务

执行上面的 /opt/sparkTest/submit.sh脚本即可

7.2、查看hbase数据

通过hbase shell命令行可查询数据已经导入 vehicle_table_202008 表,后缀是当前的年月

7.3、验证ES中数据已有数据

image.png



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。