实时即未来,大数据项目车联网之驾驶行程采样入库(15)
theme: smartblue
持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第15天,点击查看活动详情
1. 驾驶行程采样入库
l 驾驶行程数据筛选入库,为第五节的行程指标采样分析的数据
行程指用户在一定时间内,连续的、不间断的驾驶车辆进行移动行为。根据停车时间超过15分钟划分行程。驾驶行程采样指的是从一份数据里面取部分数据,这部分数据作为驾驶行程数据的样本。
1.1 驾驶行程采样逻辑
l 驾驶行程数据的采样的目的:
n 确定行程划分的划分时间段(t检验与f检验样本数据)
n 对样本数据进行分析(核对行程划分的对应指标分析结果)
n 对车辆打标签,进行车辆画像(用户画像处理)
l 驾驶行程采样结果数据字段:(都是字符串类型,同一个车辆采样数据放一个字段)
n rowkey = vin + “_” + terminalTime的毫秒值反转
n soc:剩余电量百分比
n mileage:总里程数
n speed:速度
n gps:地理位置
n terminalTime:终端时间
n processTime:计算时间,分析结果的当前时间
l 采样对应行程划分中四个时间段:5分钟、10分钟、15分钟、20分钟,取得样本数据
1.2 添加行程划分水位线(最大乱序时间为30秒)
l 主任务中定义:****TripDriveWatermark****
//TODO 8)添加水位线(允许数据延迟到达30秒钟)
SingleOutputStreamOperator<ItcastDataObj> tripDriveWatermark = itcastJsonStream
.assignTimestampsAndWatermarks(new TripDriveWatermark());
l 自定义水位线,通过时间戳定期分配时间戳并生成水位线(AssignerWithPeriodicWatermarks )
/**
* TODO 驾驶行程自定义水位线对象:解决数据迟到30秒的问题
*/
public class TripDriveWatermark implements AssignerWithPeriodicWatermarks<ItcastDataObj>, Serializable {
// todo 允许最大乱序时间为:30秒
long maxOutOfOrderness = 1000 * 30;
// todo 初始化当前水位线时间戳
Long currentMaxTimestamp = 0L;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(ItcastDataObj element, long previousElementTimestamp) {
currentMaxTimestamp = Math.max(element.getTerminalTimeStamp(), currentMaxTimestamp);
return element.getTerminalTimeStamp();
}
}
1.3 根据vin进行分组
//TODO 9)根据vin进行分组
KeyedStream<ItcastDataObj, String> keyedStream = tripDriveWatermark.keyBy(ItcastDataObj::getVin);
1.4 驾驶行程分析创建会话窗口(15分钟划分一个行程)
l 15分钟划分一个行程是通过采样数据,进行t检验和f检验得出的结果,因此,此处样本数据采集,在确定之前是分别有A(5min)、B(10min)、C(15min)、D(20min),所以样本计算在确定窗口时间之前,应分别设置窗口时间为5、10、15、20分钟
//TODO 10)应用sessionWindow
WindowedStream<ItcastDataObj, String, TimeWindow> driveDataStream = keyedStream.window(
EventTimeSessionWindows.withGap(Time.minutes(15)));
1.5 指定window function,处理行程划分采样逻辑
本小节使用到谷歌guava工具包,Guava工程包含了若干被Google的 Java项目广泛依赖 的核心库,例如:集合 [collections] 、缓存 [caching] 、原生类型支持 [primitives support] 、并发库 [concurrency libraries] 、通用注解 [common annotations] 、字符串处理 [string processing] 、I/O 等等。
l 使用guava库中的Lists类:创建一个可变的,包含给定的元素的ArrayList实例;
n 非常快速创建空的list集合并且调用
l 依赖
<!-- google guava开发工具依赖-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
l 主任务中定义
drivingDstream.apply(new DriveSampleWindowFunction())
l 自定义窗口类:****DriveSampleWindowFunction****
n 输出的字段类型全部为字符串类型
n 将窗口内的所有数据进行顺序排序,获取到排序后的第一条数据和最后一条数据
n 循环遍历窗口内的所有数据,且每隔五秒钟进行一次数据的采样
u 获取到每条数据的soc、mileage(累计里程)、speed(车速)、gps(经度+维度)、terminalTime(终端时间)字段,然后将各自字段各自拼接到一个字符串变量中
n 将拼接好的字段:vin、soc、mileage、speed、gps、terminalTime,放到字符串数组中进行输出
/**
* 实现驾驶行程采样的自定义函数开发
* 针对驾驶行程(某个车辆15分钟内所有的驾驶行程进行数据的获取及格式化)
*/
public class DriveSampleWindowFunction implements WindowFunction<ItcastDataObj, String[], String, TimeWindow> {
/**
* 重写apply方法,实现驾驶行程采样逻辑
* @param key 分流的字段类型
* @param timeWindow 窗口类型
* @param iterable 某个车辆15分钟内所有的驾驶行程
* @param collector 返回数据
* @throws Exception
*/
1.6 驾驶行程采样入库(hbase)
l 主类定义
// todo 12) 驾驶行程采样如hbase库TRIPDB:trip_sample driveSampleDataStream.addSink(new TripSampleToHBaseSink(“TRIPDB:trip_sample”));
l 创建namespace,映射到hbase中新的****namespace:TRIPDB****
create_namespace 'TRIPDB’list_namespace
l 创建存储行程采样数据hbase表:trip_sample,压缩方式:****snappy****
create “TRIPDB:trip_sample”,{ NAME => ‘cf’, COMPRESSION => ‘SNAPPY’ }
l 自定义hbase的sink类:****TripSampleToHBaseSink****
/**
* 将驾驶行程采样数据写入到hbase表中,作为采样分析的数据源
*/
总结:驾驶行程采样入库,作为驾驶行程采样实时分析的数据源
- 点赞
- 收藏
- 关注作者
评论(0)