2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析

举报
Lansonli 发表于 2021/09/29 00:11:05 2021/09/29
【摘要】 目录 案例三:电影评分数据分析 代码实现 Shuffle分区数 案例三:电影评分数据分析      使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明: 对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高...

目录

案例三:电影评分数据分析

代码实现

Shuffle分区数


案例三:电影评分数据分析

     使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:

对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于200)

数据格式如下,每行数据各个字段之间使用双冒号分开:

 

数据处理分析步骤如下:

  1. 第一步、读取电影评分数据,从本地文件系统读取
  2.  第二步、转换数据,指定Schema信息,封装到DataFrame
  3.  第三步、基于SQL方式分析
  4.  第四步、基于DSL方式分析

 

代码实现

     电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:


  
  1. package cn.itcast.sql
  2. import java.util.Properties
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  5. import org.apache.spark.storage.StorageLevel
  6. /**
  7.  * 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
  8.  */
  9. object SparkTop10Movie {
  10.     def main(args: Array[String]): Unit = {
  11.         val spark = SparkSession.builder()
  12.           .appName(this.getClass.getSimpleName.stripSuffix("$"))
  13.           .master("local[*]")
  14.           // TODO: 设置shuffle时分区数目
  15.           .config("spark.sql.shuffle.partitions", "4")
  16.           .getOrCreate()
  17.         val sc: SparkContext = spark.sparkContext
  18.         sc.setLogLevel("WARN")
  19.         import spark.implicits._
  20.         
  21.         // 1. 读取电影评分数据,从本地文件系统读取
  22.         val rawRatingsDS: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")
  23.         
  24.         // 2. 转换数据
  25.         val ratingsDF: DataFrame = rawRatingsDS
  26.             // 过滤数据
  27.             .filter(line => null != line && line.trim.split("\t").length == 4)
  28.             // 提取转换数据
  29.             .mapPartitions{iter =>
  30.                 iter.map{line =>
  31.                     // 按照分割符分割,拆箱到变量中
  32.                     val Array(userId, movieId, rating, timestamp) = line.trim.split("\t")
  33.                     // 返回四元组
  34.                     (userId, movieId, rating.toDouble, timestamp.toLong)
  35.                 }
  36.             }
  37.             // 指定列名添加Schema
  38.             .toDF("userId", "movieId", "rating", "timestamp")
  39.         /*
  40.             root
  41.              |-- userId: string (nullable = true)
  42.              |-- movieId: string (nullable = true)
  43.              |-- rating: double (nullable = false)
  44.              |-- timestamp: long (nullable = false)
  45.         */
  46.         ratingsDF.printSchema()
  47.         /*
  48.             +------+-------+------+---------+
  49.             |userId|movieId|rating|timestamp|
  50.             +------+-------+------+---------+
  51.             |     1|   1193|   5.0|978300760|
  52.             |     1|    661|   3.0|978302109|
  53.             |     1|    594|   4.0|978302268|
  54.             |     1|    919|   4.0|978301368|
  55.             +------+-------+------+---------+
  56.          */
  57.         ratingsDF.show(4)
  58.         
  59.         // TODO: 基于SQL方式分析
  60.         // 第一步、注册DataFrame为临时视图
  61.         ratingsDF.createOrReplaceTempView("view_temp_ratings")
  62.         
  63.         // 第二步、编写SQL
  64.         val top10MovieDF: DataFrame = spark.sql(
  65.             """
  66.               |SELECT
  67.               |  movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
  68.               |FROM
  69.               |  view_temp_ratings
  70.               |GROUP BY
  71.               |  movieId
  72.               |HAVING
  73.               |  cnt_rating > 200
  74.               |ORDER BY
  75.               |  avg_rating DESC, cnt_rating DESC
  76.               |LIMIT
  77.               |  10
  78.             """.stripMargin)
  79.         //top10MovieDF.printSchema()
  80.         top10MovieDF.show(10, truncate = false)
  81.         
  82.         println("===============================================================")
  83.         
  84.         // TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
  85.         import org.apache.spark.sql.functions._
  86.         val resultDF: DataFrame = ratingsDF
  87.             // 选取字段
  88.             .select($"movieId", $"rating")
  89.             // 分组:按照电影ID,获取平均评分和评分次数
  90.             .groupBy($"movieId")
  91.             .agg(
  92.                 round(avg($"rating"), 2).as("avg_rating"),
  93.                 count($"movieId").as("cnt_rating")
  94.             )
  95.             // 过滤:评分次数大于200
  96.             .filter($"cnt_rating" > 200)
  97.             // 排序:先按照评分降序,再按照次数降序
  98.             .orderBy($"avg_rating".desc, $"cnt_rating".desc)
  99.             // 获取前10
  100.             .limit(10)
  101.         //resultDF.printSchema()
  102.         resultDF.show(10)
  103.         /*// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
  104.         // 结果DataFrame被使用多次,缓存
  105.         resultDF.persist(StorageLevel.MEMORY_AND_DISK)
  106.         
  107.         // 1. 保存MySQL数据库表汇总
  108.         resultDF
  109.             .coalesce(1)
  110.             .write
  111.             .mode("overwrite")
  112.             .option("driver", "com.mysql.jdbc.Driver")
  113.             .option("user", "root")
  114.             .option("password", "root")
  115.             .jdbc(
  116.                 "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8",
  117.                 "top10_movies",
  118.                 new Properties()
  119.             )
  120.         
  121.         // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
  122.         resultDF
  123.             .coalesce(1)
  124.             .write.mode("overwrite")
  125.             .csv("data/output/top10-movies")
  126.         
  127.         // 释放缓存数据
  128.         resultDF.unpersist()*/
  129.         spark.stop()
  130.     }
  131. }

 

​​​​​​​Shuffle分区数

运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。

 

原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。可以在构建SparkSession实例对象时进行设置


  
  1. val spark = SparkSession.builder()
  2.   .appName(this.getClass.getSimpleName.stripSuffix("$"))
  3.   .master("local[*]")
  4.   // TODO: 设置shuffle时分区数目
  5.   .config("spark.sql.shuffle.partitions", "4")
  6.   .getOrCreate()
  7. val sc: SparkContext = spark.sparkContext
  8. sc.setLogLevel("WARN")
  9. import spark.implicits._

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/115794520

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。