Spark 案例实操

举报
lwq1228 发表于 2021/03/19 17:16:51 2021/03/19
【摘要】 Spark 学习中的案例实操,包括Top10 热门品类统计、Top10 热门品类中每个品类的 Top10 活跃 Session 统计、页面单跳转换率统计

一、基础数据

上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。 数据规则如下:
➢ 数据文件中每行数据采用下划线分隔数据
➢ 每一行数据表示用户的一次行为,这个行为只能是4种行为的一种
➢ 如果搜索关键字为null,表示数据不是搜索数据
➢ 如果点击的品类ID和产品ID-1,表示数据不是点击数据
➢ 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用逗号分隔,如果本次不是下单行为,则数据采用null表示
➢ 支付行为和下单行为类似

二、详细字段说明

编号 字段名称 字段类型 字段含义
1 date String 用户点击行为的日期
2 user_id Long 用户的 ID
3 session_id String Session 的 ID
4 page_id Long 某个页面的 ID
5 action_time String 动作的时间点
6 search_keyword String 用户搜索的关键词
7 click_category_id Long 某一个商品品类的 ID
8 click_product_id Long 某一个商品的 ID
9 order_category_ids String 一次订单中所有品类的 ID 集合
10 order_product_ids String 一次订单中所有商品的 ID 集合
11 pay_category_ids String 一次支付中所有品类的 ID 集合
12 pay_product_ids String 一次支付中所有商品的 ID 集合
13 city_id Long 城市 id

三、需求说明1-Top10 热门品类

品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。
我们按照每个品类的点击、下单、支付的量来统计热门品类。
鞋   点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数

综合排名 = 先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

1、实现方案1

分别统计每个品类点击的次数,下单的次数和支付的次数:
(品类,点击总数)(品类,下单总数)(品类,支付总数)
object Spark01_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
  // TODO: Top10热门品类
  val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
  // 创建Spark上下文环境对象(连接对象)
  val sc = new SparkContext(sparConf)

  // 1、读取原始日志数据
  val actionRdd = sc.textFile("input/user_visit_action.txt")
  actionRdd.cache()

  // 2、统计品类的点击数量:(品类ID,点击数量)
  val clickActionRdd = actionRdd.filter(action => {
    val datas = action.split("_")
    datas(6) != "-1"
  })

  val clickCountRdd = clickActionRdd.map(
    action => {
      val datas = action.split("_")
      (datas(6), 1)
    }
  ).reduceByKey(_ + _).map {
    case (cid, cnt) => {
      (cid, (cnt, 0, 0))
    }
  }

  // 3、统计品类的下单数量:(品类ID,下单数量)
  val orderActionRdd = actionRdd.filter(action => {
    val datas = action.split("_")
    datas(8) != "null"
  })

  // orderid => 1,2,3
  // 扁平化操作转为:((1,1)(2,1)(3,1))
  val orderCountRdd = orderActionRdd.flatMap(action => {
    val datas = action.split("_")
    val cid = datas(8)
    val cids = cid.split(",")
    cids.map(id => (id, 1))
  }).reduceByKey(_ + _).map {
    case (cid, cnt) => {
      (cid, (0, cnt, 0))
    }
  }

  // 4、统计品类的支付数量:(品类ID,支付数量)
  val payActionRdd = actionRdd.filter(action => {
    val datas = action.split("_")
    datas(10) != "null"
  })

  val payCountRdd = payActionRdd.flatMap(action => {
    val datas = action.split("_")
    val cid = datas(10)
    val cids = cid.split(",")
    cids.map(id => (id, 1))
  }).reduceByKey(_ + _).map {
    case (cid, cnt) => {
      (cid, (0, 0, cnt))
    }
  }

  // 5、将三个数据源合并在一起,进行聚合计算
  val sourceRdd = clickCountRdd.union(orderCountRdd).union(payCountRdd)
  val analysisRdd = sourceRdd.reduceByKey(
    (t1, t2) => {
      (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
    }
  )

  val resultRdd = analysisRdd.sortBy(_._2, ascending = false).take(10)

  // 6、将结果采集到控制台打印出来
  resultRdd.foreach(println)

  // 关闭Spark连接
  sc.stop()
}
}

2、实现方案2

一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类, (点击总数,下单总数,支付总数) )
object Spark02_Req1_HotCategoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    // TODO: Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    // 创建Spark上下文环境对象(连接对象)
    val sc = new SparkContext(sparConf)

    // Q:存在大量的shuffle操作(reduceByKey)
    // reduceByKey 聚合算子,spark会自动优化

    // 1、读取原始日志数据
    val actionRdd = sc.textFile("input/user_visit_action.txt")

    // 2、将数据转换结构
    // 点击场合:(品类ID,(1,0,0))
    // 下达场合:(品类ID,(0,1,0))
    // 支付场合:(品类ID,(0,0,1))
    val flatRdd = actionRdd.flatMap(action => {
      val datas = action.split("_")
      if (datas(6) != "-1") {
        // 点击场合
        List((datas(6), (1, 0, 0)))
      } else if (datas(8) != "null") {
        // 下达场合
        val cids = datas(8).split(",")
        cids.map(id => (id, (0, 1, 0)))
      } else if (datas(10) != "null") {
        // 支付场合
        val cids = datas(10).split(",")
        cids.map(id => (id, (0, 0, 1)))
      } else {
        Nil
      }
    })


    // 3、进行聚合计算
    val analysisRdd = flatRdd.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )

    val resultRdd = analysisRdd.sortBy(_._2, ascending = false).take(10)

    // 6、将结果采集到控制台打印出来
    resultRdd.foreach(println)

    // 关闭Spark连接
    sc.stop()
  }
}

3、实现方案3

使用累加器的方式聚合数据
object Spark04_Req1_HotCategoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    // TODO: Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    // 创建Spark上下文环境对象(连接对象)
    val sc = new SparkContext(sparConf)

    // 1、读取原始日志数据
    val actionRdd = sc.textFile("input/user_visit_action.txt")

    // 使用累加器
    val acc = new HotCategoryAccumulator
    sc.register(acc)

    actionRdd.foreach(
      action => {
        val datas = action.split("_")
        if (datas(6) != "-1") {
          // 点击场合
          acc.add(datas(6), "click")
        } else if (datas(8) != "null") {
          // 下达场合
          val cids = datas(8).split(",")
          cids.foreach(id => {
            acc.add(id, "order")
          })
        } else if (datas(10) != "null") {
          // 支付场合
          val cids = datas(10).split(",")
          cids.foreach(id => {
            acc.add(id, "pay")
          })
        }
      }
    )

    // 获取累加器里面的值
    val accValue = acc.value.values

    // 排序并获取结果
    val resultRdd = accValue.toList.sortWith((left, right) => {
      if (left.clickCnt > right.clickCnt) {
        true
      } else if (left.clickCnt == right.clickCnt) {
        if (left.orderCnt > right.orderCnt) {
          true
        } else if (left.orderCnt == right.orderCnt) {
          left.payCnt > right.payCnt
        } else {
          false
        }
      } else {
        false
      }
    }).take(10)

    // 6、将结果采集到控制台打印出来
    resultRdd.foreach(println)

    // 关闭Spark连接
    sc.stop()
  }

  case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)

  /**
   * 自定义累加器
   * 1、继承AccumulatorV2,定义泛型
   * IN:(品类ID,行为类型)
   * OUT:mutable.Map[String, HotCategory]
   */
  class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {
    private val hcMap = mutable.Map[String, HotCategory]()

    override def isZero: Boolean = {
      hcMap.isEmpty
    }

    override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
      new HotCategoryAccumulator
    }

    override def reset(): Unit = {
      hcMap.clear()
    }

    override def add(v: (String, String)): Unit = {
      val cid = v._1
      val actionType = v._2

      val category = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
      if (actionType == "click") {
        category.clickCnt += 1
      } else if (actionType == "order") {
        category.orderCnt += 1
      } else if (actionType == "pay") {
        category.payCnt += 1
      }

      hcMap.update(cid, category)
    }

    override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
      val map1 = this.hcMap
      val map2 = other.value

      map2.foreach {
        case (cid, hc) => {
          val category = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
          category.clickCnt += hc.clickCnt
          category.orderCnt += hc.orderCnt
          category.payCnt += hc.payCnt
          map1.update(cid, category)
        }
      }
    }

    override def value: mutable.Map[String, HotCategory] = {
      hcMap
    }
  }
}

四、需求说明2-Top10 热门品类中每个品类的 Top10 活跃 Session 统计

Top10热门品类中每个品类的Top10活跃Session统计:在需求一的基础上,增加每个品类用户session的点击统计

实现方案

object Spark05_Req2_HotCategoryTop10SessionAnalysis {
  def main(args: Array[String]): Unit = {
    // TODO: Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    // 创建Spark上下文环境对象(连接对象)
    val sc = new SparkContext(sparConf)

    // 1、读取原始日志数据
    val actionRdd = sc.textFile("input/user_visit_action.txt")
    actionRdd.cache()

    // 采集热门品类前10的数据
    val top10Ids = top10Category(actionRdd)

    // 过滤原始数据,保留点击和前10品类ID
    val filterActionRdd = actionRdd.filter(
      action => {
        val datas = action.split("_")
        if (datas(6) != "-1") {
          top10Ids.contains(datas(6))
        } else {
          false
        }
      }
    )

    // 2、根据品类ID和sessionId进行点击量的统计
    val reduceRdd = filterActionRdd.map(action => {
      val datas = action.split("_")
      ((datas(6), datas(2)), 1)
    }).reduceByKey(_ + _)

    // 3、将统计的结果进行结构的转换
    val mapRdd = reduceRdd.map {
      case ((cid, sid), sum) => {
        (cid, (sid, sum))
      }
    }

    // 4、相同的品类进行分组
    val groupRdd = mapRdd.groupByKey()

    // 生成返回结果
    val resultRdd = groupRdd.mapValues(iter => {
      iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
    })

    resultRdd.foreach(println)

    // 关闭Spark连接
    sc.stop()
  }

  def top10Category(actionRdd: RDD[String]): Array[String] = {
    val flatRdd = actionRdd.flatMap(action => {
      val datas = action.split("_")
      if (datas(6) != "-1") {
        // 点击场合
        List((datas(6), (1, 0, 0)))
      } else if (datas(8) != "null") {
        // 下达场合
        val cids = datas(8).split(",")
        cids.map(id => (id, (0, 1, 0)))
      } else if (datas(10) != "null") {
        // 支付场合
        val cids = datas(10).split(",")
        cids.map(id => (id, (0, 0, 1)))
      } else {
        Nil
      }
    })


    // 3、进行聚合计算
    val analysisRdd = flatRdd.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )

    analysisRdd.sortBy(_._2, ascending = false).take(10).map(_._1)
  }
}

五、需求说明3-页面单跳转换率统计

计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳, 7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。

实现方案

object Spark07_Req3_PageflowAnalysis {
  def main(args: Array[String]): Unit = {
    // TODO: Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    // 创建Spark上下文环境对象(连接对象)
    val sc = new SparkContext(sparConf)

    // 1、读取原始日志数据
    val actionRdd = sc.textFile("input/user_visit_action.txt")

    val actionDataRdd = actionRdd.map(action => {
      val datas = action.split("_")
      UserVisitAction(
        datas(0), datas(1).toLong, datas(2), datas(3).toLong, datas(4), datas(5), datas(6).toLong,
        datas(7).toLong, datas(8), datas(9), datas(10), datas(11), datas(12).toLong
      )
    })

    actionDataRdd.cache()
    // TODO:对指定的页面连续跳转进行统计
    val ids = List(1L, 2L, 3L, 4L, 5L, 6L, 7L)
    val okFlowIds = ids.zip(ids.tail)

    // TODO: 计算分母
    val pageIdToCountMap = actionDataRdd.filter(action => {
      ids.init.contains(action.page_id)
    }).map(action => {
      (action.page_id, 1L)
    }).reduceByKey(_ + _).collect().toMap

    // TODO:计算分子
    // 根据Session进行分组
    val sessionRdd = actionDataRdd.groupBy(_.session_id)
    // 分组后,根据访问时间进行排序,默认升序
    val mvRdd = sessionRdd.mapValues(
      iter => {
        // 对各组数据按照访问时间进行排序
        val sortList = iter.toList.sortBy(_.action_time)
        // 取得排序后的页面ID集合
        val flowIds = sortList.map(_.page_id)
        // 通过拉链操作将数据转化
        // 【1,2,3,4】转成【(1,2),(2,3),(3,4)】
        val pageFlowIds = flowIds.zip(flowIds.tail)
        // 将不合法的页面跳转进行过滤
        // 再次转换
        // 【(1,2),(2,3),(3,4)】转成【((1,2),1),((2,3),1),((3,4),2)】
        pageFlowIds.filter(t => {
          okFlowIds.contains(t)
        }).map(t => (t, 1))
      }
    )

    // 把【((1,2),1),((2,3),1),((3,4),2)】 list转成单个的元素
    val flatRdd = mvRdd.map(_._2).flatMap(list => list)
    // 对所有单个元素做聚合((1,2),sum)
    val dataRdd = flatRdd.reduceByKey(_ + _)

    // 计算单挑转换率
    // 分子除以分母
    dataRdd.foreach {
      case ((pageId1, pageId2), sum) => {
        val lon = pageIdToCountMap.getOrElse(pageId1, 0L)
        println(s"页面${pageId1}跳转到页面${pageId2}单跳转换率为" + (sum.toDouble / lon))
      }
    }

    // 关闭Spark连接
    sc.stop()
  }

  /**
   * 用户访问动作表
   *
   * @param date               用户点击行为的日期
   * @param user_id            用户的 ID
   * @param session_id         Session 的 ID
   * @param page_id            某个页面的 ID
   * @param action_time        动作的时间点
   * @param search_keyword     用户搜索的关键词
   * @param click_category_id  某一个商品品类的 ID
   * @param click_product_id   某一个商品的 ID
   * @param order_category_ids 一次订单中所有品类的 ID 集合
   * @param order_product_ids  一次订单中所有商品的 ID 集合
   * @param pay_category_ids   一次支付中所有品类的 ID 集合
   * @param pay_product_ids    一次支付中所有商品的 ID 集合
   * @param city_id            城市 id
   */
  case class UserVisitAction(date: String, user_id: Long, session_id: String, page_id: Long, action_time: String,
                             search_keyword: String, click_category_id: Long, click_product_id: Long,
                             order_category_ids: String, order_product_ids: String, pay_category_ids: String,
                             pay_product_ids: String, city_id: Long)

}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。