Spark 案例实操
【摘要】 Spark 学习中的案例实操,包括Top10 热门品类统计、Top10 热门品类中每个品类的 Top10 活跃 Session 统计、页面单跳转换率统计
一、基础数据
上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。 数据规则如下:
➢ 数据文件中每行数据采用下划线分隔数据
➢ 每一行数据表示用户的一次行为,这个行为只能是4种行为的一种
➢ 如果搜索关键字为null,表示数据不是搜索数据
➢ 如果点击的品类ID和产品ID为-1,表示数据不是点击数据
➢ 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用逗号分隔,如果本次不是下单行为,则数据采用null表示
➢ 支付行为和下单行为类似
二、详细字段说明
编号 | 字段名称 | 字段类型 | 字段含义 |
---|---|---|---|
1 | date | String | 用户点击行为的日期 |
2 | user_id | Long | 用户的 ID |
3 | session_id | String | Session 的 ID |
4 | page_id | Long | 某个页面的 ID |
5 | action_time | String | 动作的时间点 |
6 | search_keyword | String | 用户搜索的关键词 |
7 | click_category_id | Long | 某一个商品品类的 ID |
8 | click_product_id | Long | 某一个商品的 ID |
9 | order_category_ids | String | 一次订单中所有品类的 ID 集合 |
10 | order_product_ids | String | 一次订单中所有商品的 ID 集合 |
11 | pay_category_ids | String | 一次支付中所有品类的 ID 集合 |
12 | pay_product_ids | String | 一次支付中所有商品的 ID 集合 |
13 | city_id | Long | 城市 id |
三、需求说明1-Top10 热门品类
品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。
我们按照每个品类的点击、下单、支付的量来统计热门品类。
鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数
综合排名 = 先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
1、实现方案1
分别统计每个品类点击的次数,下单的次数和支付的次数:
(品类,点击总数)(品类,下单总数)(品类,支付总数)
object Spark01_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO: Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
// 创建Spark上下文环境对象(连接对象)
val sc = new SparkContext(sparConf)
// 1、读取原始日志数据
val actionRdd = sc.textFile("input/user_visit_action.txt")
actionRdd.cache()
// 2、统计品类的点击数量:(品类ID,点击数量)
val clickActionRdd = actionRdd.filter(action => {
val datas = action.split("_")
datas(6) != "-1"
})
val clickCountRdd = clickActionRdd.map(
action => {
val datas = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _).map {
case (cid, cnt) => {
(cid, (cnt, 0, 0))
}
}
// 3、统计品类的下单数量:(品类ID,下单数量)
val orderActionRdd = actionRdd.filter(action => {
val datas = action.split("_")
datas(8) != "null"
})
// orderid => 1,2,3
// 扁平化操作转为:((1,1)(2,1)(3,1))
val orderCountRdd = orderActionRdd.flatMap(action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id => (id, 1))
}).reduceByKey(_ + _).map {
case (cid, cnt) => {
(cid, (0, cnt, 0))
}
}
// 4、统计品类的支付数量:(品类ID,支付数量)
val payActionRdd = actionRdd.filter(action => {
val datas = action.split("_")
datas(10) != "null"
})
val payCountRdd = payActionRdd.flatMap(action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id => (id, 1))
}).reduceByKey(_ + _).map {
case (cid, cnt) => {
(cid, (0, 0, cnt))
}
}
// 5、将三个数据源合并在一起,进行聚合计算
val sourceRdd = clickCountRdd.union(orderCountRdd).union(payCountRdd)
val analysisRdd = sourceRdd.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
val resultRdd = analysisRdd.sortBy(_._2, ascending = false).take(10)
// 6、将结果采集到控制台打印出来
resultRdd.foreach(println)
// 关闭Spark连接
sc.stop()
}
}
2、实现方案2
一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类, (点击总数,下单总数,支付总数) )
object Spark02_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO: Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
// 创建Spark上下文环境对象(连接对象)
val sc = new SparkContext(sparConf)
// Q:存在大量的shuffle操作(reduceByKey)
// reduceByKey 聚合算子,spark会自动优化
// 1、读取原始日志数据
val actionRdd = sc.textFile("input/user_visit_action.txt")
// 2、将数据转换结构
// 点击场合:(品类ID,(1,0,0))
// 下达场合:(品类ID,(0,1,0))
// 支付场合:(品类ID,(0,0,1))
val flatRdd = actionRdd.flatMap(action => {
val datas = action.split("_")
if (datas(6) != "-1") {
// 点击场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
// 下达场合
val cids = datas(8).split(",")
cids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付场合
val cids = datas(10).split(",")
cids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
})
// 3、进行聚合计算
val analysisRdd = flatRdd.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
val resultRdd = analysisRdd.sortBy(_._2, ascending = false).take(10)
// 6、将结果采集到控制台打印出来
resultRdd.foreach(println)
// 关闭Spark连接
sc.stop()
}
}
3、实现方案3
使用累加器的方式聚合数据
object Spark04_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO: Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
// 创建Spark上下文环境对象(连接对象)
val sc = new SparkContext(sparConf)
// 1、读取原始日志数据
val actionRdd = sc.textFile("input/user_visit_action.txt")
// 使用累加器
val acc = new HotCategoryAccumulator
sc.register(acc)
actionRdd.foreach(
action => {
val datas = action.split("_")
if (datas(6) != "-1") {
// 点击场合
acc.add(datas(6), "click")
} else if (datas(8) != "null") {
// 下达场合
val cids = datas(8).split(",")
cids.foreach(id => {
acc.add(id, "order")
})
} else if (datas(10) != "null") {
// 支付场合
val cids = datas(10).split(",")
cids.foreach(id => {
acc.add(id, "pay")
})
}
}
)
// 获取累加器里面的值
val accValue = acc.value.values
// 排序并获取结果
val resultRdd = accValue.toList.sortWith((left, right) => {
if (left.clickCnt > right.clickCnt) {
true
} else if (left.clickCnt == right.clickCnt) {
if (left.orderCnt > right.orderCnt) {
true
} else if (left.orderCnt == right.orderCnt) {
left.payCnt > right.payCnt
} else {
false
}
} else {
false
}
}).take(10)
// 6、将结果采集到控制台打印出来
resultRdd.foreach(println)
// 关闭Spark连接
sc.stop()
}
case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)
/**
* 自定义累加器
* 1、继承AccumulatorV2,定义泛型
* IN:(品类ID,行为类型)
* OUT:mutable.Map[String, HotCategory]
*/
class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {
private val hcMap = mutable.Map[String, HotCategory]()
override def isZero: Boolean = {
hcMap.isEmpty
}
override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
new HotCategoryAccumulator
}
override def reset(): Unit = {
hcMap.clear()
}
override def add(v: (String, String)): Unit = {
val cid = v._1
val actionType = v._2
val category = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
if (actionType == "click") {
category.clickCnt += 1
} else if (actionType == "order") {
category.orderCnt += 1
} else if (actionType == "pay") {
category.payCnt += 1
}
hcMap.update(cid, category)
}
override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
val map1 = this.hcMap
val map2 = other.value
map2.foreach {
case (cid, hc) => {
val category = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
category.clickCnt += hc.clickCnt
category.orderCnt += hc.orderCnt
category.payCnt += hc.payCnt
map1.update(cid, category)
}
}
}
override def value: mutable.Map[String, HotCategory] = {
hcMap
}
}
}
四、需求说明2-Top10 热门品类中每个品类的 Top10 活跃 Session 统计
Top10热门品类中每个品类的Top10活跃Session统计:在需求一的基础上,增加每个品类用户session的点击统计
实现方案
object Spark05_Req2_HotCategoryTop10SessionAnalysis {
def main(args: Array[String]): Unit = {
// TODO: Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
// 创建Spark上下文环境对象(连接对象)
val sc = new SparkContext(sparConf)
// 1、读取原始日志数据
val actionRdd = sc.textFile("input/user_visit_action.txt")
actionRdd.cache()
// 采集热门品类前10的数据
val top10Ids = top10Category(actionRdd)
// 过滤原始数据,保留点击和前10品类ID
val filterActionRdd = actionRdd.filter(
action => {
val datas = action.split("_")
if (datas(6) != "-1") {
top10Ids.contains(datas(6))
} else {
false
}
}
)
// 2、根据品类ID和sessionId进行点击量的统计
val reduceRdd = filterActionRdd.map(action => {
val datas = action.split("_")
((datas(6), datas(2)), 1)
}).reduceByKey(_ + _)
// 3、将统计的结果进行结构的转换
val mapRdd = reduceRdd.map {
case ((cid, sid), sum) => {
(cid, (sid, sum))
}
}
// 4、相同的品类进行分组
val groupRdd = mapRdd.groupByKey()
// 生成返回结果
val resultRdd = groupRdd.mapValues(iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
})
resultRdd.foreach(println)
// 关闭Spark连接
sc.stop()
}
def top10Category(actionRdd: RDD[String]): Array[String] = {
val flatRdd = actionRdd.flatMap(action => {
val datas = action.split("_")
if (datas(6) != "-1") {
// 点击场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
// 下达场合
val cids = datas(8).split(",")
cids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付场合
val cids = datas(10).split(",")
cids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
})
// 3、进行聚合计算
val analysisRdd = flatRdd.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
analysisRdd.sortBy(_._2, ascending = false).take(10).map(_._1)
}
}
五、需求说明3-页面单跳转换率统计
计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳, 7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。
实现方案
object Spark07_Req3_PageflowAnalysis {
def main(args: Array[String]): Unit = {
// TODO: Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
// 创建Spark上下文环境对象(连接对象)
val sc = new SparkContext(sparConf)
// 1、读取原始日志数据
val actionRdd = sc.textFile("input/user_visit_action.txt")
val actionDataRdd = actionRdd.map(action => {
val datas = action.split("_")
UserVisitAction(
datas(0), datas(1).toLong, datas(2), datas(3).toLong, datas(4), datas(5), datas(6).toLong,
datas(7).toLong, datas(8), datas(9), datas(10), datas(11), datas(12).toLong
)
})
actionDataRdd.cache()
// TODO:对指定的页面连续跳转进行统计
val ids = List(1L, 2L, 3L, 4L, 5L, 6L, 7L)
val okFlowIds = ids.zip(ids.tail)
// TODO: 计算分母
val pageIdToCountMap = actionDataRdd.filter(action => {
ids.init.contains(action.page_id)
}).map(action => {
(action.page_id, 1L)
}).reduceByKey(_ + _).collect().toMap
// TODO:计算分子
// 根据Session进行分组
val sessionRdd = actionDataRdd.groupBy(_.session_id)
// 分组后,根据访问时间进行排序,默认升序
val mvRdd = sessionRdd.mapValues(
iter => {
// 对各组数据按照访问时间进行排序
val sortList = iter.toList.sortBy(_.action_time)
// 取得排序后的页面ID集合
val flowIds = sortList.map(_.page_id)
// 通过拉链操作将数据转化
// 【1,2,3,4】转成【(1,2),(2,3),(3,4)】
val pageFlowIds = flowIds.zip(flowIds.tail)
// 将不合法的页面跳转进行过滤
// 再次转换
// 【(1,2),(2,3),(3,4)】转成【((1,2),1),((2,3),1),((3,4),2)】
pageFlowIds.filter(t => {
okFlowIds.contains(t)
}).map(t => (t, 1))
}
)
// 把【((1,2),1),((2,3),1),((3,4),2)】 list转成单个的元素
val flatRdd = mvRdd.map(_._2).flatMap(list => list)
// 对所有单个元素做聚合((1,2),sum)
val dataRdd = flatRdd.reduceByKey(_ + _)
// 计算单挑转换率
// 分子除以分母
dataRdd.foreach {
case ((pageId1, pageId2), sum) => {
val lon = pageIdToCountMap.getOrElse(pageId1, 0L)
println(s"页面${pageId1}跳转到页面${pageId2}单跳转换率为" + (sum.toDouble / lon))
}
}
// 关闭Spark连接
sc.stop()
}
/**
* 用户访问动作表
*
* @param date 用户点击行为的日期
* @param user_id 用户的 ID
* @param session_id Session 的 ID
* @param page_id 某个页面的 ID
* @param action_time 动作的时间点
* @param search_keyword 用户搜索的关键词
* @param click_category_id 某一个商品品类的 ID
* @param click_product_id 某一个商品的 ID
* @param order_category_ids 一次订单中所有品类的 ID 集合
* @param order_product_ids 一次订单中所有商品的 ID 集合
* @param pay_category_ids 一次支付中所有品类的 ID 集合
* @param pay_product_ids 一次支付中所有商品的 ID 集合
* @param city_id 城市 id
*/
case class UserVisitAction(date: String, user_id: Long, session_id: String, page_id: Long, action_time: String,
search_keyword: String, click_category_id: Long, click_product_id: Long,
order_category_ids: String, order_product_ids: String, pay_category_ids: String,
pay_product_ids: String, city_id: Long)
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)