2021年大数据Spark(二十九):SparkSQL案例四开窗函数

举报
Lansonli 发表于 2021/09/29 01:55:59 2021/09/29
2.4k+ 0 0
【摘要】 目录   案例四:开窗函数 概述 介绍 聚合函数和开窗函数 开窗函数分类 ​​​​​​​聚合开窗函数 排序开窗函数  ROW_NUMBER顺序排序 ​​​​​​​RANK跳跃排序 ​​​​​​​ DENSE_RANK连续排序 ​​​​​​​NTILE分组排名[了解] ​​​​​​​...

目录

案例四:开窗函数

概述

介绍

聚合函数和开窗函数

开窗函数分类

​​​​​​​聚合开窗函数

排序开窗函数

 ROW_NUMBER顺序排序

​​​​​​​RANK跳跃排序

​​​​​​​ DENSE_RANK连续排序

​​​​​​​NTILE分组排名[了解]

​​​​​​​代码演示


​​​​​​​案例四:开窗函数

概述

https://www.cnblogs.com/qiuting/p/7880500.html

介绍

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

聚合函数和开窗函数

聚合函数是将多行变成一行,count,avg....

开窗函数是将一行变成多行;

聚合函数如果要显示其他的列必须将列加入到group by中

开窗函数可以不使用group by,直接将所有信息显示出来

开窗函数分类

1.聚合开窗函数

聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。

2.排序开窗函数

排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

​​​​​​​聚合开窗函数

示例1

OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。

SQL标准允许将所有聚合函数用做聚合开窗函数。

spark.sql("select  count(name)  from scores").show

spark.sql("select name, class, score, count(name) over() name_count from scores").show

查询结果如下所示:

+----+-----+-----+----------+                                                   

|name|class|score|name_count|

+----+-----+-----+----------+

|  a1|    1|   80|        11|

|  a2|    1|   78|        11|

|  a3|    1|   95|        11|

|  a4|    2|   74|        11|

|  a5|    2|   92|        11|

|  a6|    3|   99|        11|

|  a7|    3|   99|        11|

|  a8|    3|   45|        11|

|  a9|    3|   55|        11|

| a10|    3|   78|        11|

| a11|    3|  100|        11|

+----+-----+-----+----------+

 示例2

OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。

如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。

开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。

与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。

下面的 SQL 语句用于显示按照班级分组后每组的人数:

OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。

spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show

查询结果如下所示:

+----+-----+-----+----------+                                                   

|name|class|score|name_count|

+----+-----+-----+----------+

|  a1|    1|   80|         3|

|  a2|    1|   78|         3|

|  a3|    1|   95|         3|

|  a6|    3|   99|         6|

|  a7|    3|   99|         6|

|  a8|    3|   45|         6|

|  a9|    3|   55|         6|

| a10|    3|   78|         6|

| a11|    3|  100|         6|

|  a4|    2|   74|         2|

|  a5|    2|   92|         2|

+----+-----+-----+----------+

排序开窗函数

 ROW_NUMBER顺序排序

row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号

注意:

在排序开窗函数中使用 PARTITION  BY 子句需要放置在ORDER  BY 子句之前。

 ●示例1

spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()

+----+-----+-----+----+

|name|class|score|rank|

+----+-----+-----+----+

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

|  a2|    1|   78|   4|

| a10|    3|   78|   5|

|  a1|    1|   80|   6|

|  a5|    2|   92|   7|

|  a3|    1|   95|   8|

|  a6|    3|   99|   9|

|  a7|    3|   99|  10|

| a11|    3|  100|  11|

+----+-----+-----+----+

spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()

+----+-----+-----+----+                                                         

|name|class|score|rank|

+----+-----+-----+----+

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

+----+-----+-----+----+

​​​​​​​RANK跳跃排序

rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。

这个函数求出来的排名结果可以并列,并列排名之后的排名将是并列的排名加上并列数

简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名

●示例2

spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()                                                     

+----+-----+-----+----+

|name|class|score|rank|

+----+-----+-----+----+

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

| a10|    3|   78|   4|

|  a2|    1|   78|   4|

|  a1|    1|   80|   6|

|  a5|    2|   92|   7|

|  a3|    1|   95|   8|

|  a6|    3|   99|   9|

|  a7|    3|   99|   9|

| a11|    3|  100|  11|

+----+-----+-----+----+

spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()

+----+-----+-----+----+                                                         

|name|class|score|rank|

+----+-----+-----+----+

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   4|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

+----+-----+-----+----+

​​​​​​​ DENSE_RANK连续排序

dense_rank() over(order by  score) as  dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。

这个函数并列排名之后的排名只是并列排名加1

简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名

●示例3

spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()

+----+-----+-----+----+

|name|class|score|rank|

+----+-----+-----+----+

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

|  a2|    1|   78|   4|

| a10|    3|   78|   4|

|  a1|    1|   80|   5|

|  a5|    2|   92|   6|

|  a3|    1|   95|   7|

|  a6|    3|   99|   8|

|  a7|    3|   99|   8|

| a11|    3|  100|   9|

+----+-----+-----+----+

spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()

+----+-----+-----+----+                                                         

|name|class|score|rank|

+----+-----+-----+----+

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   4|

| a11|    3|  100|   5|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

+----+-----+-----+----+

​​​​​​​NTILE分组排名[了解]

ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。

 示例4

spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()

+----+-----+-----+----+

|name|class|score|rank|

+----+-----+-----+----+

|  a8|    3|   45|   1|

|  a9|    3|   55|   1|

|  a4|    2|   74|   2|

|  a2|    1|   78|   2|

| a10|    3|   78|   3|

|  a1|    1|   80|   3|

|  a5|    2|   92|   4|

|  a3|    1|   95|   4|

|  a6|    3|   99|   5|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

+----+-----+-----+----+

spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()

+----+-----+-----+----+                                                         

|name|class|score|rank|

+----+-----+-----+----+

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

+----+-----+-----+----+

​​​​​​​代码演示


      package cn.itcast.sql
      import org.apache.spark.SparkContext
      import org.apache.spark.sql.expressions.Window
      import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
      /**
       * Author itcast
       * Date 2020/9/21 9:33
       * Desc 使用SparkSQL支持的开窗函数/窗口函数完成对各个班级的学生成绩的排名
       */
      object RowNumberDemo {
        case class Score(name: String, clazz: Int, score: Int)
        def main(args: Array[String]): Unit = {
          //1.准备环境
          val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()
          val sc: SparkContext = spark.sparkContext
          sc.setLogLevel("WARN")
          import spark.implicits._
          //2.加载数据
          val scoreDF: DataFrame = sc.makeRDD(Array(
            Score("a1", 1, 80),
            Score("a2", 1, 78),
            Score("a3", 1, 95),
            Score("a4", 2, 74),
            Score("a5", 2, 92),
            Score("a6", 3, 99),
            Score("a7", 3, 99),
            Score("a8", 3, 45),
            Score("a9", 3, 55),
            Score("a10", 3, 78),
            Score("a11", 3, 100))
          ).toDF("name", "class", "score")
          scoreDF.createOrReplaceTempView("t_scores")
          scoreDF.show()
          /*
          +----+-----+-----+
          |name|class|score|num
          +----+-----+-----+
          |  a1|    1|   80|
          |  a2|    1|   78|
          |  a3|    1|   95|
          |  a4|    2|   74|
          |  a5|    2|   92|
          |  a6|    3|   99|
          |  a7|    3|   99|
          |  a8|    3|   45|
          |  a9|    3|   55|
          | a10|    3|   78|
          | a11|    3|  100|
          +----+-----+-----+
           */
          //使用ROW_NUMBER顺序排序
          spark.sql("select name, class, score, row_number() over(partition by class order by score) num from t_scores").show()
          //使用RANK跳跃排序
          spark.sql("select name, class, score, rank() over(partition by class order by score) num from t_scores").show()
          //使用DENSE_RANK连续排序
          spark.sql("select name, class, score, dense_rank() over(partition by class order by score) num from t_scores").show()
          /*
      ROW_NUMBER顺序排序--1234
      +----+-----+-----+---+
      |name|class|score|num|
      +----+-----+-----+---+
      |  a2|    1|   78|  1|
      |  a1|    1|   80|  2|
      |  a3|    1|   95|  3|
      |  a8|    3|   45|  1|
      |  a9|    3|   55|  2|
      | a10|    3|   78|  3|
      |  a6|    3|   99|  4|
      |  a7|    3|   99|  5|
      | a11|    3|  100|  6|
      |  a4|    2|   74|  1|
      |  a5|    2|   92|  2|
      +----+-----+-----+---+
      使用RANK跳跃排序--1224
      +----+-----+-----+---+
      |name|class|score|num|
      +----+-----+-----+---+
      |  a2|    1|   78|  1|
      |  a1|    1|   80|  2|
      |  a3|    1|   95|  3|
      |  a8|    3|   45|  1|
      |  a9|    3|   55|  2|
      | a10|    3|   78|  3|
      |  a6|    3|   99|  4|
      |  a7|    3|   99|  4|
      | a11|    3|  100|  6|
      |  a4|    2|   74|  1|
      |  a5|    2|   92|  2|
      +----+-----+-----+---+
      DENSE_RANK连续排序--1223
      +----+-----+-----+---+
      |name|class|score|num|
      +----+-----+-----+---+
      |  a2|    1|   78|  1|
      |  a1|    1|   80|  2|
      |  a3|    1|   95|  3|
      |  a8|    3|   45|  1|
      |  a9|    3|   55|  2|
      | a10|    3|   78|  3|
      |  a6|    3|   99|  4|
      |  a7|    3|   99|  4|
      | a11|    3|  100|  5|
      |  a4|    2|   74|  1|
      |  a5|    2|   92|  2|
      +----+-----+-----+---+
           */
          /*
          val sql =
            """
              |select 字段1,字段2,字段n,
              |row_number() over(partition by 字段1 order by 字段2 desc) num
              |from 表名
              |having num <= 3
              |""".stripMargin
          import org.apache.spark.sql.functions._
          df.withColumn(
            "num",
            row_number().over(Window.partitionBy('字段1).orderBy('字段2.desc))
          ).filter('num <= 3).show(false)
           */
        }
      }
  
 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/115807813

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。