Spark SQL案例:分组排行榜

举报
howard2005 发表于 2022/06/16 22:32:08 2022/06/16
【摘要】 文章目录 一、提出任务二、完成任务(一)新建Maven项目(二)添加相关依赖和构建插件(三)创建日志属性文件(四)创建分组排行榜单例对象(五)本地运行程序,查看结果(六)交互式操作查看中间结果1、...

一、提出任务

  • 分组求TopN是大数据领域常见的需求,主要是根据数据的某一列进行分组,然后将分组后的每一组数据按照指定的列进行排序,最后取每一组的前N行数据。
  • 有一组学生成绩数据
张三丰 90
李孟达 85
张三丰 87
王晓云 93
李孟达 65
张三丰 76
王晓云 78
李孟达 60
张三丰 94
王晓云 97
李孟达 88
张三丰 80
王晓云 88
李孟达 82
王晓云 98

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 同一个学生有多门成绩,现需要计算每个学生分数最高的前3个成绩,期望输出结果如下所示:
张三丰:94
张三丰:90
张三丰:87
李孟达:88
李孟达:85
李孟达:82
王晓云:98
王晓云:97
王晓云:93

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 数据表t_grade
    在这里插入图片描述
  • 执行查询
    在这里插入图片描述
SELECT * FROM t_grade tg 
   WHERE (SELECT COUNT(*) FROM t_grade 
	           WHERE tg.name = t_grade.name 
						    AND score >= tg.score						 		
	        ) <= 3 ORDER BY name, score DESC;    

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 预备工作:启动集群的HDFS与Spark
    在这里插入图片描述
  • 将成绩文件 - grades.txt上传到HDFS上/input目录
    在这里插入图片描述

二、完成任务

(一)新建Maven项目

  • 设置项目信息(项目名、保存位置、组编号、项目编号)
    在这里插入图片描述
  • 单击【Finish】按钮
    在这里插入图片描述
  • java目录改成scala目录
    在这里插入图片描述

(二)添加相关依赖和构建插件

  • pom.xml文件里添加依赖与Maven构建插件
    在这里插入图片描述
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.huawei.sql</groupId>
    <artifactId>GradeTopNBySQL</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

(三)创建日志属性文件

  • 在资源文件夹里创建日志属性文件 - log4j.properties
    在这里插入图片描述
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spark.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

(四)创建分组排行榜单例对象

  • net.huawei.sql包里创建GradeTopNBySQL单例对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.{Dataset, SparkSession}

/**
 * 功能:利用Spark SQL实现分组排行榜
 * 作者:华卫
 * 日期:2022年06月15日
 */
object GradeTopNBySQL {
  def main(args: Array[String]): Unit = {
    // 创建或得到Spark会话对象
    val spark = SparkSession.builder()
      .appName("GradeTopNBySQL")
      .master("local[*]")
      .getOrCreate()
    // 读取HDFS上的成绩文件
    val lines: Dataset[String] = spark.read.textFile("hdfs://master:9000/input/grades.txt")
    // 导入隐式转换
    import spark.implicits._
    // 创建成绩数据集
    val gradeDS: Dataset[Grade] = lines.map(
      line => { val fields = line.split(" ")
        val name = fields(0)
        val score = fields(1).toInt
        Grade(name, score)
      })
    // 将数据集转换成数据帧
    val df = gradeDS.toDF()
    // 基于数据帧创建临时表
    df.createOrReplaceTempView("t_grade")
    // 查询临时表,实现分组排行榜
    val top3 = spark.sql(
      """
        |SELECT name, score FROM
        |    (SELECT name, score, row_number() OVER (PARTITION BY name ORDER BY score DESC) rank from t_grade) t
        |    WHERE t.rank <= 3
        |""".stripMargin
    )
    // 显示分组排行榜结果
    top3.show()

    // 按指定格式输出分组排行榜
    top3.foreach(row => println(row(0) + ": " + row(1)))

    // 关闭Spark会话
    spark.close()
  }

  // 定义成绩样例类
  case class Grade(name: String, score: Int)
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

(五)本地运行程序,查看结果

  • 在控制台查看输出结果
    在这里插入图片描述

(六)交互式操作查看中间结果

1、读取成绩文件得到数据集

  • 执行命令:val lines: Dataset[String] = spark.read.textFile("hdfs://master:9000/input/grades.txt")
    在这里插入图片描述

2、定义成绩样例类

  • 执行命令:case class Grade(name: String, score: Int)
    在这里插入图片描述

3、导入隐式转换

在这里插入图片描述

4、创建成绩数据集

val gradeDS: Dataset[Grade] = lines.map(
      line => { val fields = line.split(" ")
        val name = fields(0)
        val score = fields(1).toInt
        Grade(name, score)
      })

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 执行上述语句
    在这里插入图片描述

5、将数据集转换成数据帧

  • 执行命令:val df = gradeDS.toDF()
    在这里插入图片描述

6、基于数据帧创建临时表

  • 执行命令:df.createOrReplaceTempView("t_grade")
    在这里插入图片描述

7、查询临时表,实现分组排行榜

  val top3 = spark.sql(
      """
        |SELECT name, score FROM
        |    (SELECT name, score, row_number() OVER (PARTITION BY name ORDER BY score DESC) rank from t_grade) t
        |    WHERE t.rank <= 3
        |""".stripMargin
    )

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 执行上述语句
    在这里插入图片描述

8、显示分组排行榜结果

  • 执行命令:top3.show()
    在这里插入图片描述

三、可能会出现的问题

  • MySQL支持这样的子查询
    在这里插入图片描述
  • 但是Spark SQL不支持,会抛出异常AnalysisException: Correlated column is not allowed in a non-equality predicate
    在这里插入图片描述

文章来源: howard2005.blog.csdn.net,作者:howard2005,版权归原作者所有,如需转载,请联系作者。

原文链接:howard2005.blog.csdn.net/article/details/125303513

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。