数据仓库行为域ODS开发
行为域ODS开发
数据仓库的经典定义:
数据仓库(Data Warehouse)是一个面向主题的(Subject Oriented)、集成的(Integrated)、相对稳定的(Non-Volatile)、反映历史变化(Time Variant)的数据集合,用于支持管理决策(Decision Making Support)。
一、行为域ODS层详细设计
1 ODS层功能
ODS:操作数据层
主要作用:直接映射操作数据(原始数据),数据备份;
建模方法:与原始数据结构保持完全一致
存储周期:相对来说,存储周期较短;视数据规模,增长速度,以及业务的需求而定;对于埋点日志数据ODS层存储,通常可以选择3个月或者半年;
-
模拟生成日志数据
1.将genlog.sh、log-generator-jar-with-dependencies.jar上传到/root目录下
2.在/root目录下创建一个moni_data文件夹
3.sh genlog.sh user 先生成user用户日志数据
4.sh genlog.sh log 在生成用户的行为日志数据 【最终需要的日志数据/root/moni_data/app.access.log.2022-11-08】
了解日志数据包含的字段
{
"account": "第五园", -- 用户名
"appid": "app1", -- app的唯一标识
"appversion": "8.1", -- app的版本号
"carrier": "360移动", -- 运营商
"deviceid": "OHFMEDDHWQGQ", -- 设备编号
"devicetype": "GALAXY-6", -- 设备类型(手机型号)
"eventid": "thumbUp", -- 事件类型ID
"ip": "34.142.34.41", -- 外网ip地址
"latitude": 27.48026698890073, -- 纬度
"longitude": 107.10385540147524, -- 经度
"nettype": "WIFI", -- 网络类型 4g 5g wifi
"osname": "android", -- 操作系统
"osversion": "8.5", -- 操作系统版本号
"properties": { -- 属性 不同的eventid事件,拥有不同的属性
"itemId": "00768",
"refUrl": "/teachers/tea0838.html",
"pageId": "tea0927",
"url": "/teachers/tea0927.html"
},
"releasechannel": "木蚂蚁安卓应用市场", -- 应用市场
"resolution": "2048*768", -- 分辨率
"sessionid": "kvmqpnwn", -- 会话ID
"timestamp": 1667867494026 -- 时间戳
}
二、行为域ODS层开发需求
1.存储规划
数据类型 | 输入路径(HDFS目录) | 目标位置(HIVE表) |
---|---|---|
app端埋点日志 | /logdata/app/2022-11-08/ | ods.app_event_log 分区:2022-11-08 |
1.在HDFS上创建一个目录/logdata/app/2022-11-08 hdfs dfs -mkdir -p /logdata/app/2023-06-22
2.使用put命令,将日志上传到HDFS的/logdata/app/2022-11-08目录下 hdfs dfs -put /root/moni_data/app.access.log.2023-06-22 /logdata/app/2023-06-22
2.入库要求
-
原始日志格式
普通文本文件,JSON数据格式,导入hive表后,要求可以很方便地select各个字段
-
分区表
-
外部表
3 hive建表加强
Json数据的hive解析
由于原始数据是普通文本文件,而文件内容是json格式的一条一条记录
在创建hive表结构进行映射时,有两种选择:
-
将数据视为无结构的string
-
将数据按json格式进行映射(这需要JsonSerde 的支持)
本项目采用方案2来进行建表映射
使用HIVE内置JsonSerDe
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
官方文档: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-JSON
4.日志数据建表
app事件日志建表
create database ods;
drop table if exists ods.app_event_log;
create table ods.app_event_log(
account string,
appid string,
appversion string,
carrier string,
deviceid string,
devicetype string,
eventid string,
ip string,
latitude double,
longitude double,
nettype string,
osname string,
osversion string,
properties Map<String,String>,
releasechannel string,
resolution string,
sessionid string,
`timestamp` bigint
)
partitioned by(dt string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
STORED AS TEXTFILE;
注意:使用 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe' 这种方式 需要json中的属性名和表中的字段名保持一致
5.入库命令
load data inpath '/logdata/app/2022-11-08' into table ods.app_event_log partition(dt='2022-11-08')
6.脚本开发
01.日志数据导入ODS层.sh
#! /bin/bash
dt1=$1
if [ 'x'$1 == 'x' ];then
dt1=$(date -d'-1 day' +%Y-%m-%d)
fi
echo $dt1
#hive -e "sql"
hive -e "load data inpath '/logdata/app/$dt1' into table ods.app_event_log partition(dt='$dt1')"
三、行为域DWD开发
1. 概要设计
1.存储规划
数据类型 | 源表 | 目标表 |
---|---|---|
app端埋点日志 | ods.event_app_log | dwd.event_log_detail |
2 技术选型
由于本层数据ETL的需求较为复杂,用hive sql实现非常困难
因而此环节将开发spark程序来实现
2. 需求分析
1.清洗过滤
1,去除json数据体中的废弃字段(前端开发人员在埋点设计方案变更后遗留的无用字段)
2,过滤掉json格式不正确的(脏数据)
3,过滤掉日志中缺少关键字段(deviceid/properties/eventid/sessionid 缺任何一个都不行)的记录
4,过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
a)、创建数据库和一张表
create database tmp; -- 临时层 ods层数据经过处理先写入tmp,因为还没有完全处理成dwd层希望的结果
-- 临时层:数据清洗之后存储event_log_washed
create table tmp.event_log_washed(
account string,
appid string,
appversion string,
carrier string,
deviceid string,
devicetype string,
eventid string,
ip string,
latitude double,
longitude double,
nettype string,
osname string,
osversion string,
properties Map<String,String>,
releasechannel string,
resolution string,
sessionid string,
`timestamp` bigint
)
partitioned by(dt string)
b)、创建一个类AppLogWash,读取ods.app_event_log表的数据写入tmp.event_log_washed
① 添加依赖
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.3</version>
</dependency>
② 在resources目录下创建一个application.properties
#local.run为true代表在本地测试,否则在集群测试
local.run=false
③ 读取配置文件ConfigUtils工具类
package cn.yh.utils
import com.typesafe.config.{Config, ConfigFactory}
/**
* 1.在pom.xml添加一个依赖
* 2.在resources目录下创建一个application.properties配置文件
* 3.编写这个工具类
*/
object ConfigUtils {
//加载resources目录下的application.properties配置文件
private val config: Config = ConfigFactory.load()
//根据配置文件中的key获取value
val LOCAL_RUN:Boolean = config.getBoolean("local.run")
}
④ 获取SparkSession对象的工具类
package cn.yh.utils
import org.apache.spark.sql.SparkSession
object SparkUtils {
def getSparkSession(appName:String) = {
if(ConfigUtils.LOCAL_RUN){
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(appName)
.enableHiveSupport()
.getOrCreate()
spark
}else{
val spark: SparkSession = SparkSession.builder()
.master("spark://hadoop10:7077")
.appName(appName)
.enableHiveSupport()
.getOrCreate()
spark
}
}
}
⑤ 数据清洗类
package cn.yh.ods_etl
import cn.yh.utils.SparkUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
object AppLogWash {
def main(args: Array[String]): Unit = {
if(args.length == 0){
println("请提供时间参数")
System.exit(0)
}
val time = args(0)
println(s"参数1 ${time}")
val spark: SparkSession = SparkUtils.getSparkSession("数据清洗-AppLogWash")
val sql =
s"""
|
|insert overwrite table tmp.event_log_washed
|partition(dt='${time}')
|select
| account ,
| appid ,
| appversion ,
| carrier ,
| deviceid ,
| devicetype ,
| eventid ,
| ip ,
| latitude ,
| longitude ,
| nettype ,
| osname ,
| osversion ,
| properties ,
| releasechannel ,
| resolution ,
| sessionid ,
| `timestamp`
|from ods.app_event_log
|where dt = '${time}'
| and deviceid is not null and trim(deviceid) != ''
| and eventid is not null and trim(eventid) != ''
| and properties is not null and size(properties) != 0
| and sessionid is not null and trim(sessionid) != ''
| and unix_timestamp(to_utc_timestamp('${time}','GMT+8'),'yyyy-MM-dd')*1000 <= `timestamp`
| and unix_timestamp(to_utc_timestamp(date_add('${time}',1),'GMT+8'),'yyyy-MM-dd')*1000 > `timestamp`
|
|""".stripMargin
spark.sql(sql)
spark.stop()
}
}
⑥ 编写【02.数据清洗.sh】脚本,测试AppLogWash类
[root@hadoop10 app]# cat /root/shell/02.数据清洗.sh
#! /bin/bash
dt=$1
if [ 'x'$dt == 'x' ];then
dt=$(date -d'-1 day' +'%Y-%m-%d')
fi
echo $dt
spark-submit --master spark://hadoop10:7077 --class cn.yh.ods_etl.AppLogWash /opt/app/spark-dw.jar $dt
[root@hadoop10 app]# sh /root/shell/02.数据清洗.sh 2022-11-08
2.SESSION分割
1.什么是session(会话) 浏览器访问一个系统,需要登陆,登陆之后点击收藏、订单、浏览记录则不需要再登陆,浏览器默认的会话时间是30分钟 手机APP(客户端)往往不需要像浏览器这样频繁的登陆,因为sessionid(token、票据)存活时间比较长,所以手机APP一次会话时间比较长 手机APP的一次会话往往时间较长,会超出30分钟
2.session切割? 因为APP的session往往持续时间比较长,不像浏览器那样,有固定的30分钟,所以对APP上的行为进行会话内的分析是不准确的 当一个行为发生时间相比较上一个行为的时间超过10分钟,则认为用户刚才将APP后台或者退出了,则从该行为开始属于一个新的会话
设备编号 事件名称 时间 sessionid
deviceid1 event1 2022-10-14 10:10:10 abc
deviceid1 event2 2022-10-14 10:10:40 abc
deviceid1 event3 2022-10-14 10:13:40 abc
deviceid1 event4 2022-10-14 10:16:40 abc
deviceid1 event5 2022-10-14 10:19:40 abc
deviceid1 event6 2022-10-14 10:25:40 abc
deviceid1 event7 2022-10-14 11:46:40 abc
deviceid1 event8 2022-10-14 11:49:40 abc
进行session切割之后的数据
设备编号 事件名称 时间 sessionid newsessionid
deviceid1 event1 2022-10-14 10:10:10 abc abc-0
deviceid1 event2 2022-10-14 10:10:40 abc abc-0
deviceid1 event3 2022-10-14 10:13:40 abc abc-0
deviceid1 event4 2022-10-14 10:16:40 abc abc-0
deviceid1 event5 2022-10-14 10:19:40 abc abc-0
deviceid1 event6 2022-10-14 10:25:40 abc abc-0
deviceid1 event7 2022-10-14 11:46:40 abc abc-1
deviceid1 event8 2022-10-14 11:49:40 abc abc-1
总结:通过对用户的行为按照时间进行session会话的切割,可以对设备的行为按照会话级别粒度进行分析,例如一个用户在一次会话中从哪进入了产品,又从哪退出了产品,在一次会话中用户的停留时长,在一次会话中用户访问了多少个页面等
a) 创建一张表,存储session切割之后的结果
create table tmp.event_log_splited(
account string,
appid string,
appversion string,
carrier string,
deviceid string,
devicetype string,
eventid string,
ip string,
latitude double,
longitude double,
nettype string,
osname string,
osversion string,
properties Map<String,String>,
releasechannel string,
resolution string,
sessionid string,
`timestamp` bigint,
newsessionid string
)
partitioned by(dt string)
b) 创建一个类AppLogSessionSplit,通过sparksql完成session会话切割操作
package cn.yh.ods_etl
import cn.yh.utils.SparkUtils
import org.apache.spark.sql.SparkSession
object AppLogSessionSplit {
def main(args: Array[String]): Unit = {
if(args.length == 0){
println("请输入时间,格式为(yyyy-mm-dd)")
System.exit(0)
}
val time = args(0)
val spark: SparkSession = SparkUtils.getSparkSession("session切割")
spark.sql(
s"""
|
|insert overwrite table tmp.event_log_splited
|partition (dt='${time}')
|select account ,
| appid ,
| appversion ,
| carrier ,
| deviceid ,
| devicetype ,
| eventid ,
| ip ,
| latitude ,
| longitude ,
| nettype ,
| osname ,
| osversion ,
| properties ,
| releasechannel ,
| resolution ,
| sessionid ,
| `timestamp` ,
| concat(sessionid,'-',sum(ts) over(partition by sessionid order by `timestamp`)) newsessionid
|from(
|select
| *,
| if( (`timestamp`-lag(`timestamp`,1,`timestamp`) over(partition by sessionid order by `timestamp`))/1000/60 > 10,1,0 ) ts
| from tmp.event_log_washed t1
| where dt = '${time}'
|)t2
|
|""".stripMargin)
spark.stop()
}
}
3. 数据集成(本质上是做维度退化)
1,将日志中的GPS经纬度坐标解析成省、市、县(区)信息;(为了方便后续的地域维度分析)
2,将日志中的IP地址解析成省、市、县(区)信息;(为了方便后续的地域维度分析)
注:app日志和wxapp日志,有采集到的用户事件行为时的所在地gps坐标信息
web日志则无法收集到用户的gps坐标,但可以收集到ip地址
gps坐标可以表达精确的地理位置,而ip地址只能表达准确度较低而且精度较低的地理位置
地理位置解析
-
高德逆地理位置
参考:高德开发者平台
https://lbs.amap.com/api/webservice/summary/
使用scala代码完成上述接口的查询工作
① 添加依赖
<dependency>
<groupId>org.scalaj</groupId>
<artifactId>scalaj-http_2.11</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.6.8</version>
</dependency>
② 测试
package test
import scalaj.http.{Http, HttpRequest, HttpResponse}
object Test_高德逆地理位置测试 {
def main(args: Array[String]): Unit = {
//1.创建request请求对象,发送url请求
var request: HttpRequest = Http("https://restapi.amap.com/v3/geocode/regeo")
request = request.param("key","自己申请的key")
.param("location","104.679284,31.534707")
//2.获取响应对象,并且将响应对象中body里面的内容转换成字符串
val response: HttpResponse[String] = request.asString
//3.获取body里面的内容
val body: String = response.body
//4.使用hutool解析body里面的省市区
//{"status":"1","regeocode":{"addressComponent":{"city":"绵阳市","province":"四川省","adcode":"510703","district":"涪城区","towncode":"510703103000","streetNumber":{"number":"13正","location":"104.681946,31.533269","direction":"东南","distance":"298.728","street":"兴龙街"},"country":"中国","township":"青义镇","businessAreas":[[]],"building":{"name":[],"type":[]},"neighborhood":{"name":[],"type":[]},"citycode":"0816"},"formatted_address":"四川省绵阳市涪城区青义镇G5京昆高速"},"info":"OK","infocode":"10000"}
}
}
-
根据高德逆地理位置,解析经纬度解析省市区写入DWD层
create database dwd;
create table dwd.event_log_detail(
account string,
appid string,
appversion string,
carrier string,
deviceid string,
devicetype string,
eventid string,
ip string,
latitude double,
longitude double,
nettype string,
osname string,
osversion string,
properties Map<String,String>,
releasechannel string,
resolution string,
sessionid string,
`timestamp` bigint,
newsessionid string,
province string,
city string,
district string
)
partitioned by(dt string)
package cn.yh.ods_etl
import cn.hutool.json.{JSONObject, JSONUtil}
import cn.yh.utils.SparkUtils
import scalaj.http.{Http, HttpRequest, HttpResponse}
object AppLogToDWD {
def main(args: Array[String]): Unit = {
val spark = SparkUtils.getSparkSession("根据经纬度查询地理位置")
//自定义函数,根据经纬度查询省市区
//三个参数:经度/纬度/标识符号(province、city、district)
spark.udf.register("get_cityinfo",(longitude:String,latitude:String,s1:String)=>{
var request: HttpRequest = Http("https://restapi.amap.com/v3/geocode/regeo")
request = request.param("key","e2cc8f0f89145aa2f9b3da7ee99cd91b")
.param("location",longitude+","+latitude)
val httpResponse: HttpResponse[String] = request.asString
val jSONObject: JSONObject = JSONUtil.parseObj(httpResponse.body)
val status: String = jSONObject.getStr("status") //返回结果状态值 返回值为0或1,0表示请求失败;1表示请求成功。
var name = ""
if("1".equals(status)){
val jSONObject1 = jSONObject.getJSONObject("regeocode").getJSONObject("addressComponent")
if("province".equals(s1)){
name = jSONObject1.getStr("province")
}else if("city".equals(s1)){
name = jSONObject1.getStr("city")
}else if("district".equals(s1)){
name = jSONObject1.getStr("district")
}
}else{
name = "未知"
}
name
})
spark.sql(
"""
|
|insert overwrite table dwd.event_log_detail
|partition(dt = '2022-11-08')
|select account ,
| appid ,
| appversion ,
| carrier ,
| deviceid ,
| devicetype ,
| eventid ,
| ip ,
| latitude ,
| longitude ,
| nettype ,
| osname ,
| osversion ,
| properties ,
| releasechannel,
| resolution ,
| sessionid ,
| `timestamp` ,
| newsessionid ,
| get_cityinfo(round(longitude,6),round(latitude,6),'province') province,
| get_cityinfo(round(longitude,6),round(latitude,6),'city') city,
| get_cityinfo(round(longitude,6),round(latitude,6),'district') district
|from tmp.event_log_splited where dt = '2022-11-08' limit 1
|
|""".stripMargin)
spark.stop()
}
}
注意:上述代码会造成高德API接口调用次数过多
-
计算经纬度的GeoHash
GEOHASH编码介绍
Geohash编码是一种地理位置编码技术,它可将一个gps坐标(含经、纬度)点,转化为一个字符串;
wx3y5697
wx3y5694
通过编码后得到的字符串,表达的是:包含被编码gps坐标点的一个矩形范围;
GEOHASH编码原理
在地球经纬度范围内,不断通过二分来划分矩形范围,通过观察gps坐标点所落的范围,来反复生成0/1二进制码。
在满足精度要求后,将所得的二进制编码通过base32编码技术转成字符串码,如下所示:
GEOHASH码的精度
字符串长度越长,表达的精度越高,矩形范围越小,越逼近原gps坐标点;
相反,长度越短,表达的精度越低,矩形范围越大;
geohash码的精确度对应表格:
GEOHASH编码工具包
gps坐标 转码成 geohash编码,这个算法不需要自己手写,有现成的工具包
1.添加依赖
<dependency>
<groupId>ch.hsr</groupId>
<artifactId>geohash</artifactId>
<version>1.3.0</version>
</dependency>
2.根据经纬度,生成一个GeoHash值
/**
* 参数1:纬度
* 参数2:经度
* 参数3:生成geohash字符串的长度
*/
val str1: String = GeoHash.geoHashStringWithCharacterPrecision(34.795013, 113.54602, 5)
println(str1)
将mysql_testdata.sql导入mysql数据库下,读取mysql数据库t_md_areas表中得数据,获取省市区街道、经纬度通过GeoHash处理经纬度,得到GeoHash编码,将带有GeoHash编码的结果写入Hive数仓
1.使用spark读取mysql中t_md_areas表中数据,存到DataFrame中
2.spark自定义函数,参数是经纬度,返回值是GeoHash编码
3.创建DataFrame对应临时表,进行自连接查询,获取省市区街道、经纬度,并且使用自定义函数处理经纬度获取GeoHash编码
4.将上述结果写入hive表中dim.area_geo (id编号、省、市、区、街道、经度、纬度、GeoHash编码)
① 创建dim层 create database dim;
② 计算GeoHash
package cn.yh.ods_etl
import java.util.Properties
import ch.hsr.geohash.GeoHash
import cn.yh.utils.{ConfigUtils, SparkUtils}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 读取mysql数据库中t_md_areas表的数据,并且计算街道经纬的geohash编码
*/
object ComputerGeoHash {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkUtils.getSparkSession("计算经纬度的GeoHash编码")
//1.spark读取mysql表数据
val prop = new Properties()
prop.setProperty("driver",ConfigUtils.DRIVER)
prop.setProperty("user",ConfigUtils.USERNAME)
prop.setProperty("password",ConfigUtils.PASSWORD)
val df: DataFrame = spark.read.jdbc(ConfigUtils.URL, "t_md_areas", prop)
//2.自定义单行函数,根据经纬度统计geo_hash
spark.udf.register("get_geohash",(latitude:String,longitude:String,n:Int)=>{
val str: String = GeoHash.geoHashStringWithCharacterPrecision(latitude.toDouble, longitude.toDouble, n)
str
})
//3.自连接,查询省市区、经纬度、5位长度geohash、6位长度的geohash
df.createTempView("t_md_areas")
val sql = """
|select t1.id,
| t1.AREANAME prov,t2.AREANAME city,t3.AREANAME region,
| t4.AREANAME street,t4.BD09_LAT latitude,t4.BD09_LNG longitude,
| get_geohash(t4.BD09_LAT,t4.BD09_LNG,5) geohash5,
| get_geohash(t4.BD09_LAT,t4.BD09_LNG,6) geohash6
|from t_md_areas t1
|inner join t_md_areas t2 on t1.id = t2.parentid and t1.parentid = 0
|inner join t_md_areas t3 on t2.id = t3.parentid
|inner join t_md_areas t4 on t3.id = t4.parentid
|where t4.BD09_LAT is not null and t4.BD09_LNG is not null
|
|""".stripMargin
val df2: DataFrame = spark.sql(sql)
//4.将df2的数据写入hive表中
//dim数据库需要手动提前创建,saveAsTable底层会自动创建area_geo表
df2.write.saveAsTable("dim.area_geo")
spark.stop()
}
}
- 点赞
- 收藏
- 关注作者
评论(0)