2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
【摘要】
目录
案例三:电影评分数据分析
代码实现
Shuffle分区数
案例三:电影评分数据分析
使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:
对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高...
目录
案例三:电影评分数据分析
使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:
对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于200)。
数据格式如下,每行数据各个字段之间使用双冒号分开:
数据处理分析步骤如下:
- 第一步、读取电影评分数据,从本地文件系统读取
- 第二步、转换数据,指定Schema信息,封装到DataFrame
- 第三步、基于SQL方式分析
- 第四步、基于DSL方式分析
代码实现
电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:
-
package cn.itcast.sql
-
-
import java.util.Properties
-
-
import org.apache.spark.SparkContext
-
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-
import org.apache.spark.storage.StorageLevel
-
-
/**
-
* 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
-
*/
-
object SparkTop10Movie {
-
def main(args: Array[String]): Unit = {
-
val spark = SparkSession.builder()
-
.appName(this.getClass.getSimpleName.stripSuffix("$"))
-
.master("local[*]")
-
// TODO: 设置shuffle时分区数目
-
.config("spark.sql.shuffle.partitions", "4")
-
.getOrCreate()
-
val sc: SparkContext = spark.sparkContext
-
sc.setLogLevel("WARN")
-
import spark.implicits._
-
-
// 1. 读取电影评分数据,从本地文件系统读取
-
val rawRatingsDS: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")
-
-
// 2. 转换数据
-
val ratingsDF: DataFrame = rawRatingsDS
-
// 过滤数据
-
.filter(line => null != line && line.trim.split("\t").length == 4)
-
// 提取转换数据
-
.mapPartitions{iter =>
-
iter.map{line =>
-
// 按照分割符分割,拆箱到变量中
-
val Array(userId, movieId, rating, timestamp) = line.trim.split("\t")
-
// 返回四元组
-
(userId, movieId, rating.toDouble, timestamp.toLong)
-
}
-
}
-
// 指定列名添加Schema
-
.toDF("userId", "movieId", "rating", "timestamp")
-
/*
-
root
-
|-- userId: string (nullable = true)
-
|-- movieId: string (nullable = true)
-
|-- rating: double (nullable = false)
-
|-- timestamp: long (nullable = false)
-
*/
-
ratingsDF.printSchema()
-
/*
-
+------+-------+------+---------+
-
|userId|movieId|rating|timestamp|
-
+------+-------+------+---------+
-
| 1| 1193| 5.0|978300760|
-
| 1| 661| 3.0|978302109|
-
| 1| 594| 4.0|978302268|
-
| 1| 919| 4.0|978301368|
-
+------+-------+------+---------+
-
*/
-
ratingsDF.show(4)
-
-
// TODO: 基于SQL方式分析
-
// 第一步、注册DataFrame为临时视图
-
ratingsDF.createOrReplaceTempView("view_temp_ratings")
-
-
// 第二步、编写SQL
-
val top10MovieDF: DataFrame = spark.sql(
-
"""
-
|SELECT
-
| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
-
|FROM
-
| view_temp_ratings
-
|GROUP BY
-
| movieId
-
|HAVING
-
| cnt_rating > 200
-
|ORDER BY
-
| avg_rating DESC, cnt_rating DESC
-
|LIMIT
-
| 10
-
""".stripMargin)
-
//top10MovieDF.printSchema()
-
top10MovieDF.show(10, truncate = false)
-
-
println("===============================================================")
-
-
// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
-
import org.apache.spark.sql.functions._
-
val resultDF: DataFrame = ratingsDF
-
// 选取字段
-
.select($"movieId", $"rating")
-
// 分组:按照电影ID,获取平均评分和评分次数
-
.groupBy($"movieId")
-
.agg(
-
round(avg($"rating"), 2).as("avg_rating"),
-
count($"movieId").as("cnt_rating")
-
)
-
// 过滤:评分次数大于200
-
.filter($"cnt_rating" > 200)
-
// 排序:先按照评分降序,再按照次数降序
-
.orderBy($"avg_rating".desc, $"cnt_rating".desc)
-
// 获取前10
-
.limit(10)
-
//resultDF.printSchema()
-
resultDF.show(10)
-
-
/*// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
-
// 结果DataFrame被使用多次,缓存
-
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
-
-
// 1. 保存MySQL数据库表汇总
-
resultDF
-
.coalesce(1)
-
.write
-
.mode("overwrite")
-
.option("driver", "com.mysql.jdbc.Driver")
-
.option("user", "root")
-
.option("password", "root")
-
.jdbc(
-
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8",
-
"top10_movies",
-
new Properties()
-
)
-
-
// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
-
resultDF
-
.coalesce(1)
-
.write.mode("overwrite")
-
.csv("data/output/top10-movies")
-
-
// 释放缓存数据
-
resultDF.unpersist()*/
-
-
spark.stop()
-
}
-
}
Shuffle分区数
运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。可以在构建SparkSession实例对象时进行设置
-
val spark = SparkSession.builder()
-
.appName(this.getClass.getSimpleName.stripSuffix("$"))
-
.master("local[*]")
-
// TODO: 设置shuffle时分区数目
-
.config("spark.sql.shuffle.partitions", "4")
-
.getOrCreate()
-
val sc: SparkContext = spark.sparkContext
-
sc.setLogLevel("WARN")
-
import spark.implicits._
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/115794520
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)