SparkSQL核心知识

举报
唐TTT 发表于 2019/01/18 20:09:46 2019/01/18
【摘要】 一、SparkSQL概述1、概念 官网:http://spark.apache.org/sql/ Spark SQK是Spark用来处理结构化数据(结构化数据可以来自外部结构化数据源也可以通过RDD获取)的一个模块 外部的结构化数据源包括 Json,parquet(默认),rmdbs,hive等2、Spark SQL的优点 mapreduce ...

一、SparkSQL概述

1、概念

   官网:http://spark.apache.org/sql/


       Spark SQK是Spark用来处理结构化数据(结构化数据可以来自外部结构化数据源也可以通过RDD获取)的一个模块


        外部的结构化数据源包括 Json,parquet(默认),rmdbs,hive等


2、Spark SQL的优点

     mapreduce             hive(sql框架)减少代码编写


     sparkcore              sparksql(sql框架) 


      hive将sql转换成mapreduce,然后提交到集群上执行,大大简化了mapreduce的程序的复杂性,由于mapreduce这种计算模型。因此spark sql就应运而生了 


优点: 1、容易整合


            2、同一的数据访问方式


            3、兼容hive


            4、标准的数据连接


3、Spark SQL版本迭代

1)sparkSQL的前身是shark


2)spark-1.1(2014-9-11)开始的引入sparksql,对hive进行无缝的兼容


3)spark-1.3:增加了DataFrame的API


4)spark-1.4:增加了窗口分析函数


5)spark-1.5:增加了UDF函数


6)spark-1.6:引入DataSet SparkSession


7)spark-2.x:SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入SparkSession 统一了 RDD,DataFrame,DataSet 的编程入口


 


二、SparkSession

1、介绍

       SparkSession实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样可以使用的。SparkSession内部封装了SparkContext,所有计算实际上是由SparkContext完成的。


2、特点

1)为用户提供了同一的切入点使用spark各项功能


2)允许用户通过它调用DataFrame和DataSet相关API来编写程序


3)减少用户需要了解的一些概念,可以很容易的与Spark进行交互


4)与spark交互之时不需要显示创建sparkconf,sparkcontext以及sparksql,这些对象已经封闭在sparksession中


5)sparksession提供对hive特征的内部支持,用hive sql写sql语句访问hive udfs,从hive表中读取数据


                      在创建对象的时候加上: enableHiveSupport();


                         hive url/metstore:元数据库在哪里;


                        hive  warehouse:真实数据在哪里


3、SparkSession的创建

  1)如果在spark-shell中


[qyl@qyl02 ~]$ ~/apps/spark-2.3.1-bin-hadoop2.7/bin/spark-shell \

> --master spark://qyl02:7077 \

> --executor-memory 512m \

> --total-executor-cores 1

2)在idea中创建SparkSession


val spark=SparkSession.builder()

                      .appName("sparksqlexample")

                      .master("local")

                      .getOrCreate()

 

三、RDD/DataFrame/DataSet

1、RDD的局限性

           RDD仅表示数据集,RDD没有元数据,也就是说没有字段语义定义


2、什么是DataFrame? 

             是按列名的方式去组织的一个分布式数据集(RDD)


          由于RDD的局限性,Spark产生了DataFrame


         DataFrame=RDD+Schema=schemaRDD


               其中Schenam就是元数据,是语义描述


特点:


      内部数据无类型,统一为Row


      DataFrame是一种特殊类型的DataSet,DataSet[Row]=DataFrame


      DataFrame自带优化器Catalyst,可以自动优化程序


      DataFrame提供了一整套的Data Source API


 


3、DataSet的产生

    Row 运行时类型检查,比如salary是字符串类型,下面的语句也只有运行时才进行类型检查


dataframe.filter("salary>1000").show()

 由于DataFrame的数据类型统一是Row,所以DataFrame也是有缺点的。


明显缺点:


            1、Row不能直接操作domain对象


            2、函数风格编程,没有面向对象风格的API


所以,Spark SQL引入了DataSet,扩展了DataFrmae的API,提供了编译时类型检查,面向对象风格的API


4、Spark SQL程序基本编写步骤

 1)创建SparkSession对象


val sparksession:SparkSession=SparkSession.builder()

                                          .appName("mysparkSession")

                                          .master("local")

                                          .getOrCreate()

2)创建DataFrame或者DataSet


3)在DataFrame或者DataSet之上进行转换和Action


4)返回结果


5、创建DataFrame

创建DataFrame有三种方式:  val studentRDD


1、导入隐式转换


import spark.implicits._

  val studentDF=studentRDD.toDF

2、使用spark.sqlcontext.createDataFrame(studentRDD,schema)方法创建


valstudentRowRDD=studentRDD.map(student=>Row(student.getName,student.getAge,student.

 getClass))

 

val schema=StructType(fields=List(

        StructField("name",DataTypes.StringType,false),

        StructField("age",DataTypes.IntegerType,false),

        StructField("class",DataTypes.StringTyps,false)

))

 

val studentDF=spark.createDataFrame(studentRowRDD,schema)

 

四、Spark SQL 的wordcount

1、准备数据 hello.txt

hello word hello test

hello you hello me

you is beautiful and is pure

2、编写代码

package com.qyl

 

import org.apache.spark.sql.SparkSession

 

object WordCount {

  def main(args: Array[String]): Unit = {

 

    /*

    * 1.创建编程入口

    * */

    val spark = SparkSession.builder()

      .master("local[2]")

      .appName("WordCount")

      .getOrCreate()

    /*

    * 2.读取文件,转成DataFrame

    * */

    import spark.implicits._

    val helloDF = spark.read.text("data/hello.txt").toDF("line")

    /*

    * 3.创建临时视图

    * */

    helloDF.createOrReplaceTempView("hellotable")

    println("--------------------split拆分--------------")

    var sql=

      """

        |select

        |split(line," ")

        |from hellotable

      """.stripMargin

    println("-----------explode压平----------------------")

    sql=

      """

        |select

        |explode(split(line," ")) as word

        |from hellotable

      """.stripMargin

    println("------------统计结果------------------")

    sql="""

      |select

      | tmp.word,

      | count(tmp.word) as count

      |from (

      |  select

      |     explode(split(line, '\\s+')) as word

      |  from hellotable

      |) tmp

      |group by tmp.word

      |order by count desc

    """.stripMargin

 

    spark.sql(sql).show()

    spark.stop()

  }

}

3、结果

|     word|count|

+---------+-----+

|    hello|    4|

|       is|    2|

|      you|    2|

|       me|    1|

|     pure|    1|

|     word|    1|

|     test|    1|

|      and|    1|

|beautiful|    1|

五、Spark SQL高级用法

1、SparkSQL自定义普通函数

/**

  * SparkSQL自定义UDF操作:

  *

  *  1、编写一个UDF函数,输入输出参数

  *  2、向SQLContext进行注册,该UDF

  *  3、就直接使用

  *

  *  案例:通过计算字符串长度的函数,来学习如何自定义UDF

  */

object _02SparkSQLUDFOps {

    def main(args: Array[String]): Unit = {

        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)

        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

        Logger.getLogger("org.project-spark").setLevel(Level.WARN)

 

        val spark = SparkSession.builder()

            .master("local[2]")

            .appName("_02SparkSQLUDFOps")

            //            .enableHiveSupport()

            .getOrCreate()

        //第二步:向SQLContext进行注册,该UDF

        spark.udf.register[Int, String]("strLen", str => strLen(str))

 

        val topnDF = spark.read.json("data/sql/people.json")

        topnDF.createOrReplaceTempView("people")

        //自定义字符串长度函数,来就去表中name的长度

        //第三步:就直接使用

        val sql =

            """

              |select

              | name,

              | strLen(name) nameLen

              |from people

            """.stripMargin

 

        spark.sql(sql).show()

        spark.stop()

    }

 

    //第一步:编写一个UDF函数,输入输出参数

    def strLen(str:String):Int = str.length

}

2、自定义聚集(UDAF)函数

/**

  * 自定义UDAF操作:

  *  1、编写一个UDAF类,extends UserDefinedAggregateFunction

  *     复写其中的若干方法

  *  2、和UDF一样向SQLContext进行注册,该UDAF

  *  3、就直接使用

  *  模拟count函数

  *  可以参考在sparkCore中学习的combineByKey或者aggregateByKey

  */

object _03SparkSQlUDAFOps {

    def main(args: Array[String]): Unit = {

        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)

        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

        Logger.getLogger("org.project-spark").setLevel(Level.WARN)

 

        val spark = SparkSession.builder()

            .master("local[2]")

            .appName("_02SparkSQLUDFOps")

            //            .enableHiveSupport()

            .getOrCreate()

        //2、和UDF一样向SQLContext进行注册,该UDAF

        spark.udf.register("myCount", new MyCountUDAF())

 

        val topnDF = spark.read.json("data/sql/people.json")

        topnDF.createOrReplaceTempView("people")

        topnDF.show()

 

        println("------------------------------")

        //3、就直接使用

        val sql =

            """

              |select

              |  age,

              |  myCount(age) countz

              |from people

              |group by age

            """.stripMargin

        spark.sql(sql).show()

        spark.stop()

    }

}

/**

  * 自定义UDAF

  */

class MyCountUDAF extends UserDefinedAggregateFunction {

    //该udaf的输入的数据类型

    override def inputSchema: StructType = {

        StructType(List(

            StructField("age", DataTypes.IntegerType, false)

        ))

    }

    /**

      * 在该udaf聚合过程中的数据的类型Schema

      */

    override def bufferSchema: StructType = {

        StructType(List(

            StructField("age", DataTypes.IntegerType, false)

        ))

    }

 

    //该udaf的输出的数据类型

    override def dataType: DataType = DataTypes.IntegerType

 

    //确定性判断,通常特定输入和输出的类型一致

    override def deterministic: Boolean = true

    /**

        初始化的操作

      var sum = 1

      for(i <- 0 to 9) {

        sum += i

      }

      row.get(0)

      @param buffer 就是我们计算过程中的临时的存储了聚合结果的Buffer(extends Row)

     */

    override def initialize(buffer: MutableAggregationBuffer): Unit = {

        buffer.update(0, 0)//更新当前buffer数组中的第1列(索引为0)的值为0

    }

    /**

      * 分区内的数据聚合合并

      * @param buffer 就是我们在initialize方法中声明初始化的临时缓冲区

      * @param input  聚合操作新传入的值

      */

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

        val oldValue = buffer.getInt(0) //row.get(0)

        buffer.update(0,  oldValue + 1)

    }

    /**

      * 分区间的数据聚合合并

      * 聚合之后将结果传递给分区一

      * @param buffer1 分区一聚合的临时结果

      * @param buffer2 分区二聚合的临时结果

      *                reduce(v1, v2)

      */

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {

        val pav1 = buffer1.getInt(0)

        val pav2 = buffer2.getInt(0)

        buffer1.update(0, pav1 + pav2)

    }

    /**

      * 该聚合函数最终要返回的值

      * @param buffer 数据就被存储在该buffer中

      */

    override def evaluate(buffer: Row): Any = {

        buffer.getInt(0)

    }

}

3、SparkSQL开窗函数的使用

 * SparkSQL中的开窗函数的使用:

  *     row_number()         --->分组topN(必须掌握)

  *     sum() over()         --->分组累加

  *     avg/min/max() over() --->分组求最大

  *

  */

object _05SparkSQLWindowFuncOps {

    def main(args: Array[String]): Unit = {

        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)

        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

        Logger.getLogger("org.project-spark").setLevel(Level.WARN)

        val spark = SparkSession.builder()

            .master("local[2]")

            .appName("_04SparkSQLUDTFOps")

//            .enableHiveSupport()

            .getOrCreate()

        val topnDF = spark.read.json("data/sql/topn.json")

        topnDF.createOrReplaceTempView("stu_score")

        println("==================原始数据=========================")

        topnDF.show()

        println("===========计算各个科目学员成绩降序==================")

        val sql =

            """

              |select

              |    course,

              |    name,

              |    score,

              |    row_number() over(partition by course order by score desc) rank

              |from stu_score

            """.stripMargin

        spark.sql(sql).show()

        println("=========计算各个科目学员成绩降序Top3================")

//        val topnSQL =

//            """

//              |select

//              |    course,

//              |    name,

//              |    score,

//              |    row_number() over(partition by course order by score desc) rank

//              |from stu_score

//              |having rank < 4

//            """.stripMargin

        val topnSQL =

            """

              |select

              | tmp.*

              |from (

              |   select

              |      course,

              |      name,

              |      score,

              |      row_number() over(partition by course order by score desc) rank

              |   from stu_score

              |)tmp

              |where tmp.rank < 4

            """.stripMargin

        spark.sql(topnSQL).show

        spark.stop()

    }

}

所用pom.xml文件的配置:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.qyl</groupId>

  <artifactId>sparkSQL</artifactId>

  <version>1.0-SNAPSHOT</version>

  <inceptionYear>2008</inceptionYear>

  <properties>

    <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>

    <maven.compiler.source>1.8</maven.compiler.source>

    <maven.compiler.target>1.8</maven.compiler.target>

    <encoding>UTF-8</encoding>

    <scala.version>2.11.8</scala.version>

    <spark.version>2.3.2</spark.version>

    <hadoop.version>2.7.6</hadoop.version>

    <scala.compat.version>2.11</scala.compat.version>

  </properties>

 

  <repositories>

    <repository>

      <id>scala-tools.org</id>

      <name>Scala-Tools Maven2 Repository</name>

      <url>http://scala-tools.org/repo-releases</url>

    </repository>

  </repositories>

 

  <pluginRepositories>

    <pluginRepository>

      <id>scala-tools.org</id>

      <name>Scala-Tools Maven2 Repository</name>

      <url>http://scala-tools.org/repo-releases</url>

    </pluginRepository>

  </pluginRepositories>

 

  <dependencies>

    <dependency>

      <groupId>org.scala-lang</groupId>

      <artifactId>scala-library</artifactId>

      <version>${scala.version}</version>

    </dependency>

    <dependency>

      <groupId>junit</groupId>

      <artifactId>junit</artifactId>

      <version>4.4</version>

      <scope>test</scope>

    </dependency>

    <dependency>

      <groupId>org.specs</groupId>

      <artifactId>specs</artifactId>

      <version>1.2.5</version>

      <scope>test</scope>

    </dependency>

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-core_2.11</artifactId>

      <version>${spark.version}</version>

    </dependency>

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-sql_2.11</artifactId>

      <version>${spark.version}</version>

    </dependency>

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-hive_2.11</artifactId>

      <version>${spark.version}</version>

    </dependency>

    <!-- sparkStreaming -->

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-streaming_2.11</artifactId>

      <version>${spark.version}</version>

    </dependency>

    <!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->

    <dependency>

      <groupId>org.dom4j</groupId>

      <artifactId>dom4j</artifactId>

      <version>2.0.0</version>

    </dependency>

    <dependency>

      <groupId>junit</groupId>

      <artifactId>junit</artifactId>

      <version>4.12</version>

      <scope>compile</scope>

    </dependency>

  </dependencies>

 

  <build>

    <!--<sourceDirectory>src/main/scala</sourceDirectory>

    <testSourceDirectory>src/test/scala</testSourceDirectory>-->

    <plugins>

      <plugin>

        <groupId>org.scala-tools</groupId>

        <artifactId>maven-scala-plugin</artifactId>

        <version>2.15.0</version>

        <executions>

          <execution>

            <goals>

              <goal>compile</goal>

              <goal>testCompile</goal>

            </goals>

          </execution>

        </executions>

        <configuration>

          <scalaVersion>${scala.version}</scalaVersion>

          <args>

            <arg>-target:jvm-1.5</arg>

          </args>

        </configuration>

      </plugin>

      <plugin>

        <groupId>org.apache.maven.plugins</groupId>

        <artifactId>maven-eclipse-plugin</artifactId>

        <configuration>

          <downloadSources>true</downloadSources>

          <buildcommands>

            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>

          </buildcommands>

          <additionalProjectnatures>

            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>

          </additionalProjectnatures>

          <classpathContainers>

            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>

            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>

          </classpathContainers>

        </configuration>

      </plugin>

      <plugin>

        <artifactId>maven-assembly-plugin</artifactId>

        <configuration>

          <descriptorRefs>

            <descriptorRef>jar-with-dependencies</descriptorRef>

          </descriptorRefs>

          <archive>

            <!--<manifest>

              <mainClass></mainClass>

            </manifest>-->

          </archive>

        </configuration>

        <executions>

          <execution>

            <id>make-assembly</id>

            <phase>package</phase>

            <goals>

              <goal>single</goal>

            </goals>

          </execution>

        </executions>

      </plugin>

      <plugin>

        <groupId>org.apache.maven.plugins</groupId>

        <artifactId>maven-compiler-plugin</artifactId>

        <configuration>

          <source>1.8</source>

          <target>1.8</target>

        </configuration>

      </plugin>

      <plugin>

        <groupId>org.codehaus.mojo</groupId>

        <artifactId>build-helper-maven-plugin</artifactId>

        <version>1.10</version>

        <executions>

          <execution>

            <id>add-source</id>

            <phase>generate-sources</phase>

            <goals>

              <goal>add-source</goal>

            </goals>

            <configuration>

              <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->

              <sources>

                <source>src/main/java</source>

                <source>src/main/scala</source>

              </sources>

            </configuration>

          </execution>

        </executions>

      </plugin>

    </plugins>

  </build>

  <reporting>

    <plugins>

      <plugin>

        <groupId>org.scala-tools</groupId>

        <artifactId>maven-scala-plugin</artifactId>

        <configuration>

          <scalaVersion>${scala.version}</scalaVersion>

        </configuration>

      </plugin>

    </plugins>

  </reporting>

</project>


【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。