基于FusionInsight开发智能搜车系统
一、背景
1.1、业务背景
XX市交警智能搜车系统,通过各卡口摄像头采集每天过往的车辆信息,存入数据库。交警人员在页面上可根据灵活条件进行车辆信息查询、违章查询、或者描绘车辆轨迹。
卡口数据包括两种:1、以csv格式保存的历史文本数据;2、实时卡口数据
项目需求:
1. 根据精确车牌号查出车辆信息
2. 根据模糊车牌号查出车辆信息
3. 查出没有系安全带的违法车辆信息
4. 查处符合某时间规则的车辆信息
5. 根据车辆颜色和品牌查出车辆信息
1.2、平台选型
FI选型使用 HD8.0版本,对应Spark版本2.4.5,ES版本7.6.0,HBase版本2.2.3
二、方案设计
2.1、数据模型
车辆详细信息字段有上百个,全量保存在HBase表中,如下是抽取出的部分需做索引的字段,存储到ES中
字段 | 样例数据 | 数据类型 | 说明 |
id | 4p9tqo41ksu7 | string | 随机字符串,唯一标记 |
checkpoint_id | A21001040 | string | 卡口编号 |
time | 2019-03-01 10:57:46 | string | 采集时间 |
plate_no | 粤123456 | string | 车牌号 |
plate_color | 1 | int | 车牌号颜色标记: 白、灰、黑、红、紫、蓝、黄、绿、青、棕、粉红(0-10) |
vehicle_color | 1 | int | 车辆颜色: 白、灰、黑、红、紫、蓝、黄、绿、青、棕、粉红(0-10) |
vehicle_type | SUV | string | 车辆类型 |
vehicle_brand | 本田 | string | 车辆品牌 |
smoking | 1 | int | 是否抽烟 1:是 0:否 |
safebelt | 1 | int | 是否系安全带 1:是 0:否 |
2.2、数据流向
2.3、HBase表设计
1、避免单表数据量过大,以月为单位建表
2、以单条记录唯一主键 id 作为rowkey
三、历史数据处理
1、历史数据通过bulkload方式导入HBase,具体步骤可参考帖子 https://bbs.huaweicloud.com/forum/thread-66116-1-1.html
2、导入HBase的数据再导入ES,可通过HBase2ES迁移工具,具体可参考 https://bbs.huaweicloud.com/forum/thread-71359-1-1.html
四、实时数据流
4.1、模拟数据流生成
我们通过python脚本(见附件)定时生成数据文件,定时放到指定目录,例如/var/log/realtimeLog
脚本data_gen.py存放于部署有flume客户端的节点的/opt/test目录下,执行命令为
nohup python data_gen.py -i /opt/test -o /var/log/realtimeLog &
生成的样例数据如图:
4.2、配置Flume
我们这里直接将数据文件通过Flume发往Kafka,使用Kafka普通模式端口,检查kafka配置 allow.everyone.if.no.acl.found为true
Flume配置文件通过Manager页面上的Flume配置工具生成,如图:
Flume客户端下载后,解压缩,得到安装脚本,路径为:
/tmp/flume-Client/FusionInsight_Cluster_1_Flume_ClientConfig/Flume/FlumeClient
通过该路径下的install.sh 安装flume客户端
./install.sh -d /opt/realtimeFlume -c /opt/test/properties.properties -f 10.244.230.213 -n realtimeTest
其中-d为安装目录,-c为指定的上面生成的配置文件,-f为Flume Monitor实例的一个IP,-n为客户端指定的名称
安装完毕后,启动测试数据生成脚本
通过kafka脚本验证数据是否有生成:
4.3、SparkStreaming读取kafka数据
该部分详细可参考产品文档《应用开发指南》-《安全模式》-《Spark2x开发指南》-《开发程序》-《SparkStreaming对接Kafka0-10程序》-《Java样例代码》部分
4.4、SparkStreaming写入HBase和ES
获取到 微批 JavaStreamingContext,通过foreachRDD写入HBase和ES
HBase主要调用HBase的API,ES调用的是low level rest Client接口,相关代码可参考附件
五、打包
建议不要将依赖打成一个包,如果有版本更改,可方便后面替换依赖包
通过 Artifacts 打包如图
可能碰到如下错误,是因为没有引入scala的SDK
在Global Libraries中引入scala的SDK即可
六、配置客户端
因为Kafka的认证信息是通过JAAS认证机制,通过Spark自带的—keytab自己无法解决认证对接的问题,需要单独处理。
Spark中driver和executor默认分别使用客户端/Spark2x/spark/conf目录下的jaas.conf(driver端)和jaas-zk.conf(executor端)进行认证。
1、jaas.conf文件内容参考,注意keyTab指定的是提交任务的节点上存放user.keytab的绝对路径
Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/opt/client/Spark2x/spark/user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; };
2、jaas-zk.conf文件内容参考,这里keyTab是上传到executor后的user.keytab的路径,使用相对路径
Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; };
3、将连接HBase的依赖配置文件拷贝到客户端/Spark2x/spark/conf目录下
包括HBase客户端下的 core-site.xml、hdfs-site.xml和hbase-site.xml
4、创建提交任务的临时目录,例如/opt/sparkTest,如图,将依赖的jar包放到该目录下的lib目录下,创建启动任务的submit.sh
该目录下的连接ES的配置文件为 esParams.properties,该文件参数参考附件
七、提交任务查看执行结果
7.1、提交spark任务
执行上面的 /opt/sparkTest/submit.sh脚本即可
7.2、查看hbase数据
通过hbase shell命令行可查询数据已经导入 vehicle_table_202008 表,后缀是当前的年月
7.3、验证ES中数据已有数据
- 点赞
- 收藏
- 关注作者
评论(0)