Spark Core项目实战(2) | Top10热门品类中每个品类的 Top10 活跃 Session 统计
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—
不温不火
,本意是希望自己性情温和
。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/
项目代码博主已经打包到Github需要的可以自行下载:https://github.com/459804692/spark0729
一. 需求分析
对于排名前 10 的品类,分别获取每个品类点击次数排名前 10 的 sessionId。(注意: 这里我们只关注点击次数, 不关心下单和支付次数)
这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 sessionId。
这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。
二. 思路
- 过滤出来 category Top10的日志
- 需要用到需求1的结果, 然后只需要得到categoryId就可以了
- 转换结果为 RDD[(categoryId, sessionId), 1] 然后统计数量 => RDD[(categoryId, sessionId), count]
- 统计每个品类 top10. => RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]
- 对每个 Iterable[(sessionId, count)]进行排序, 并取每个Iterable的前10
- 把数据封装到 CategorySession 中
三. 项目实现
1. bean类
- 1. 创建SessionInfo
case class SessionInfo(sessionId:String, count: Long) extends Ordered[SessionInfo]{ // 按照降序排列 // else if (this.count == that.count) 0 这个不能加,否则会去重 override def compare(that: SessionInfo): Int = if (this.count >that.count) -1 else 1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
2. 具体实现
- 1. 主类实现
package com.buwenbuhuo.spark.core.project.app
import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
**
*@author 不温卜火
**
* @create 2020-07-29 12:18
**
* MyCSDN :https://buwenbuhuo.blog.csdn.net/
*/
object ProjectApp {
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ProjectAPP").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) // 把数据从文件读出来 val sourceRDD: RDD[String] = sc.textFile("D:/user_visit_action.txt") // 把数据封装好(封装到样例类中)
// sourceRDD.collect.foreach(println) val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => { val fields: Array[String] = line.split("_") UserVisitAction( fields(0), fields(1).toLong, fields(2), fields(3).toLong, fields(4), fields(5), fields(6).toLong, fields(7).toLong, fields(8), fields(9), fields(10), fields(11), fields(12).toLong) }) // 需求1: val categoryTop10: List[CategoryCountInfo] = CategoryTopApp.calcCatgoryTop10(sc , userVisitActionRDD) // 需求2:top10品类的top10session CategorySessionTopApp.statCategorySessionTop10(sc,categoryTop10,userVisitActionRDD) // 关闭项目(sc) sc.stop() }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 2. 解决方案1(原始方法,没任何优化)
package com.buwenbuhuo.spark.core.project.app
import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, SessionInfo, UserVisitAction}
import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
/**
**
*
* @author 不温卜火
* *
* @create 2020-07-29 20:07
**
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*
*/
object CategorySessionTopApp { /* 1. 最原始方法 ,没有任何优化,方案1 */ def statCategorySessionTop10(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={ // 1. 过滤出来只包含 top10 品类id的那些点击记录 // 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容 val cids: List[Long] = categoryTop10.map(_.categoryId.toLong) val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id)) // 2.每个品类top10session的计算 // 2.1 先map出来需要字段 val cidSidAndOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1)) // 2.2 做聚合操作 得到RDD[((cid,sid),count)] val cidSidAndCount: RDD[((Long, String), Int)] = cidSidAndOne.reduceByKey(_ + _) // map 出来想要的数据结构 val cidAndSidCount: RDD[(Long, (String, Int))] = cidSidAndCount.map { case ((cid, sid), count) => (cid, (sid, count)) } // 2.3 分组 排序取Top10 val cidAandSidCountItRDD: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey() // 2.4 对每个值排序取top10 // 解法1: 最原始的写法 最不好的写法,it.toList一时爽,不过最终可能会因为内存原因而爆掉 val result = cidAandSidCountItRDD mapValues((it:Iterable[(String,Int)]) =>{ // 只能使用scala排序,scala排序必须把所有数据全部加载到内存才能排。 // 如果数据量很小可以 ,数据量大就不行了 it.toList.sortBy(-_._2).take(10) }) result.collect.foreach(println) }
}
/*
计算热门session口径:看每个session的点击记录
1. 过滤出来只包含 top10 品类id的那些点击记录
2. 每个品类top10session的计算 => RDD[(品类id, sessionId))] map => RDD[(品类id, sessionId), 1)] reduceByKey => RDD[(品类id, sessionId), count)] map => RDD[品类id, (sessionId, count)] groupByKey RDD[品类id, Iterator[(sessionId, count)]] map内部,对iterator排序,取前10
----
*/
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 3. 解决方案2
/* 2. 解决方案2: 每次排序一个cid,需要排10次 */ def statCategorySessionTop10_2(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={ // 1. 过滤出来只包含 top10 品类id的那些点击记录 // 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容 val cids: List[Long] = categoryTop10.map(_.categoryId.toLong) val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id)) // 方案2写法1: /* // 2. 需要排10次 cids.foreach(cid =>{ // 2.1 先过滤出来点击id是cid的那些记录 val cidUserVisitActionRDD: RDD[UserVisitAction] = filteredUserVisitActionRDD.filter(_.click_category_id == cid) // 2.2 聚合 val result: Map[Long, List[(String, Int)]] = cidUserVisitActionRDD .map(action => ((action.click_category_id, action.session_id), 1)) .reduceByKey(_ + _) .map { case ((cid, sid), count) => (cid, (sid, count)) } .sortBy(- _._2._2) .take(10) .groupBy(_ . _1) .map{ case (cid,arr) => (cid,arr.map(_._2).toList) } println(result.toMap) }) */ // 方案2写法2: // 2. 需要排10次 val temp: List[Map[Long, List[(String, Int)]]] = cids.map(f = cid => { // 2.1 先过滤出来点击id是cid的那些记录 val cidUserVisitActionRDD: RDD[UserVisitAction] = filteredUserVisitActionRDD.filter(_.click_category_id == cid) // 2.2 聚合 val r: Map[Long, List[(String, Int)]] = cidUserVisitActionRDD .map(action => ((action.click_category_id, action.session_id), 1)) .reduceByKey(_ + _) .map { case ((cid, sid), count) => (cid, (sid, count)) } .sortBy(-_._2._2) .take(10) .groupBy(_._1) .map { case (cid, arr) => (cid, arr.map(_._2).toList) } r }) val result: List[(Long, List[(String, Int)])] = temp.flatMap(map => map) result.foreach(println) }
/*
使用scala的排序,会导致内存溢出
问题解决方案: 方案2: 1. 使用spark排序,来解决问题 2. spark的排序是整体排序。 不能直接使用spark排序 3. 10个品类id,我就使用spark的排序功能排10次 优点: 一定能完成,不会oom 缺点: 要起10个Job,排序10次 */
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 4. 解决方案3
/* 3. 解决方案3: 找一个可以排序的集合,然后时刻保持这个集合中只有10最大的元素 */ def statCategorySessionTop10_3(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={ // 1. 过滤出来只包含 top10 品类id的那些点击记录 // 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容 val cids: List[Long] = categoryTop10.map(_.categoryId.toLong) val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id)) // 2.每个品类top10session的计算 // 2.1 先map出来需要字段 val cidSidAndOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1)) // 2.2 做聚合操作 得到RDD[((cid,sid),count)] val cidSidAndCount: RDD[((Long, String), Int)] = cidSidAndOne.reduceByKey(_ + _) // map 出来想要的数据结构 val cidAndSidCount: RDD[(Long, (String, Int))] = cidSidAndCount.map { case ((cid, sid), count) => (cid, (sid, count)) } // 2.3 分组 排序取Top10 val cidAandSidCountItRDD: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey() // 2.4 对每个值排序取top10 val result = cidAandSidCountItRDD mapValues((it:Iterable[(String,Int)]) =>{ // 不要把Iterable直接转成list再排序 var set = mutable.TreeSet[SessionInfo]() it.foreach{ case (sid,count) => val info: SessionInfo = SessionInfo(sid, count) set += info if(set.size > 10) set = set.take(10) } set.toList }) // 起1 job result.collect.foreach(println) Thread.sleep(1000000) }
/* 方案3: 内存溢出,iterable => 转换list 最终的目的top10 搞一个集合,这集合中永远只保存10个元素,用于最大的10个元素 先聚合,聚合后分组,分组内做了排序(用了自动排序的功能集合TreeSet) 优点: 一定可以完成,也不会oom,job也是只有一个job 坏处: 做了两次shuffle,效率比较低下 */
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 5. 解决方案4
/* 4. 解决方案4: 去掉groupBy,减少shuffle的次数 */ def statCategorySessionTop10_4(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={ // 1. 过滤出来只包含 top10 品类id的那些点击记录 // 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容 val cids: List[Long] = categoryTop10.map(_.categoryId.toLong) val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id)) // 2.每个品类top10session的计算 // 2.1 先map出来需要字段 val cidSidAndOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1)) // 2.2 做聚合操作 得到RDD[((cid,sid),count)] val cidSidAndCount: RDD[((Long, String), Int)] = cidSidAndOne.reduceByKey(new CategorySessionPartitioner(cids),_ + _) // 2.3 cidSidAndCount 执行mapPartitions val result: RDD[(Long, List[SessionInfo])] = cidSidAndCount.mapPartitions(it => { // 不要把Iterable直接转成list再排序 var set = mutable.TreeSet[SessionInfo]() var categoryId = -1L it.foreach { case ((cid, sid), count) => categoryId = cid val info: SessionInfo = SessionInfo(sid, count) set += info if (set.size > 10) set = set.take(10) }
// set.map((categoryId, _)).toIterator Iterator((categoryId, set.toList)) }) result.collect.foreach(println) Thread.sleep(1000000) }
}
class CategorySessionPartitioner(cids:List[Long]) extends Partitioner {
private val cidIndexMap: Map[Long, Int] = cids.zipWithIndex.toMap // 分区和品类id数量保持一致,可以保证一个的分区只有一个cid
override def numPartitions: Int = 10 // (Long,String) => (cid,sessionId)
override def getPartition(key: Any): Int = key match { // 使用这个cid在数组中的下标作为分区的索引非常合适 case (cid:Long,_) => cidIndexMap(cid)
}
}
/* 方案4: 对方案3做优化,减少一次shuffle 减少shuffle只能是去掉groupByKey 还得需要得到每一个cid的所有session的集合?! 怎样得到? rdd是分区的,mapPartitions(it => {}) 能不能让一个分区只有一个cid的所有数据 每个分区只有一种cid,如何做到每个分区只有一个cid? 用自定义区分器! 10cid,应该有10个分区 */
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
综合上述四种方法 最后一种方法是最完美的
本次的分享就到这里了,
好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”
一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注
我哦!
文章来源: buwenbuhuo.blog.csdn.net,作者:不温卜火,版权归原作者所有,如需转载,请联系作者。
原文链接:buwenbuhuo.blog.csdn.net/article/details/107644250
- 点赞
- 收藏
- 关注作者
评论(0)