2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount

举报
Lansonli 发表于 2021/09/29 00:23:04 2021/09/29
【摘要】 目录 案例一:花式查询 案例二:WordCount 基于DSL编程 基于SQL编程 具体演示代码如下:   案例一:花式查询 package cn.itcast.sql import org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimp...

目录

案例一:花式查询

案例二:WordCount

基于DSL编程

基于SQL编程

具体演示代码如下:


 

案例一:花式查询


  
  1. package cn.itcast.sql
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame, SparkSession}
  5. /**
  6.  * Author itcast
  7.  * Desc 演示SparkSQL的各种花式查询
  8.  */
  9. object FlowerQueryDemo {
  10.   case class Person(id:Int,name:String,age:Int)
  11.   def main(args: Array[String]): Unit = {
  12.     //1.准备环境-SparkSession
  13.     val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
  14.     val sc: SparkContext = spark.sparkContext
  15.     sc.setLogLevel("WARN")
  16.     //2.加载数据
  17.     val lines: RDD[String] = sc.textFile("data/input/person.txt")
  18.     //3.切割
  19.     //val value: RDD[String] = lines.flatMap(_.split(" "))//错误的
  20.     val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))
  21.     //4.将每一行(每一个Array)转为样例类(相当于添加了Schema)
  22.     val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))
  23.     //5.将RDD转为DataFrame(DF)
  24.     //注意:RDD的API中没有toDF方法,需要导入隐式转换!
  25.     import spark.implicits._
  26.     val personDF: DataFrame = personRDD.toDF
  27.     //6.查看约束
  28.     personDF.printSchema()
  29.     //7.查看分布式表中的数据集
  30.     personDF.show(6,false)//false表示不截断列名,也就是列名很长的时候不会用...代替
  31.     //演示SQL风格查询
  32.     //0.注册表名
  33.     //personDF.registerTempTable("t_person")//已经过时
  34.     //personDF.createTempView("t_person")//创建表,如果已存在则报错:TempTableAlreadyExistsException
  35.     //personDF.createOrReplaceGlobalTempView("t_person")//创建全局表,可以夸session使用,查询的时候使用:SELECT * FROM global_temp.表名;生命周期太大,一般不用
  36.     personDF.createOrReplaceTempView("t_person")//创建一个临时表,只有当前session可用!且表如果存在会替换!
  37.     //1.查看name字段的数据
  38.     spark.sql("select name from t_person").show
  39.     //2.查看 name 和age字段数据
  40.     spark.sql("select name,age from t_person").show
  41.     //3.查询所有的name和age,并将age+1
  42.     spark.sql("select name,age,age+1 from t_person").show
  43.     //4.过滤age大于等于25的
  44.     spark.sql("select name,age from t_person where age >=25").show
  45.     //5.统计年龄大于30的人数
  46.     spark.sql("select count(age) from t_person where age >30").show
  47.     //6.按年龄进行分组并统计相同年龄的人数
  48.     spark.sql("select age,count(age) from t_person group by age").show
  49.     //演示DSL风格查询
  50.     //1.查看name字段的数据
  51.     import org.apache.spark.sql.functions._
  52.     personDF.select(personDF.col("name")).show
  53.     personDF.select(personDF("name")).show
  54.     personDF.select(col("name")).show
  55.     personDF.select("name").show
  56.     //2.查看 name 和age字段数据
  57.     personDF.select(personDF.col("name"),personDF.col("age")).show
  58.     personDF.select("name","age").show
  59.     //3.查询所有的name和age,并将age+1
  60.     //personDF.select("name","age","age+1").show//错误,没有age+1这一列
  61.     //personDF.select("name","age","age"+1).show//错误,没有age1这一列
  62.     personDF.select(col("name"),col("age"),col("age")+1).show
  63.     personDF.select($"name",$"age",$"age"+1).show
  64.     //$表示将"age"变为了列对象,先查询再和+1进行计算
  65.     personDF.select('name,'age,'age+1).show
  66.     //'表示将age变为了列对象,先查询再和+1进行计算
  67.     //4.过滤age大于等于25的,使用filter方法/where方法过滤
  68.     personDF.select("name","age").filter("age>=25").show
  69.     personDF.select("name","age").where("age>=25").show
  70.     //5.统计年龄大于30的人数
  71.     personDF.where("age>30").count()
  72.     //6.按年龄进行分组并统计相同年龄的人数
  73.     personDF.groupBy("age").count().show
  74.   }
  75. }

 

​​​​​​​案例二:WordCount

前面使用RDD封装数据,实现词频统计WordCount功能,从Spark 1.0开始,一直到Spark 2.0,建立在RDD之上的一种新的数据结构DataFrame/Dataset发展而来,更好的实现数据处理分析。DataFrame 数据结构相当于给RDD加上约束Schema,知道数据内部结构(字段名称、字段类型),提供两种方式分析处理数据:DataFrame API(DSL编程)和SQL(类似HiveQL编程),下面以WordCount程序为例编程实现,体验DataFrame使用。

基于DSL编程

使用SparkSession加载文本数据,封装到Dataset/DataFrame中,调用API函数处理分析数据(类似RDD中API函数,如flatMap、map、filter等),编程步骤:

 第一步、构建SparkSession实例对象,设置应用名称和运行本地模式;

 第二步、读取HDFS上文本文件数据;

 第三步、使用DSL(Dataset API),类似RDD API处理分析数据;

 第四步、控制台打印结果数据和关闭SparkSession;

 

基于SQL编程

也可以实现类似HiveQL方式进行词频统计,直接对单词分组group by,再进行count即可,步骤如下:

 第一步、构建SparkSession对象,加载文件数据,分割每行数据为单词;

 第二步、将DataFrame/Dataset注册为临时视图(Spark 1.x中为临时表);

 第三步、编写SQL语句,使用SparkSession执行获取结果;

 第四步、控制台打印结果数据和关闭SparkSession;

 

具体演示代码如下:


  
  1. package cn.itcast.sql
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  5. /**
  6.  * Author itcast
  7.  * Desc 使用SparkSQL完成WordCount---SQL风格和DSL风格
  8.  */
  9. object WordCount {
  10.   def main(args: Array[String]): Unit = {
  11.     //1.准备环境
  12.     val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()
  13.     val sc: SparkContext = spark.sparkContext
  14.     sc.setLogLevel("WARN")
  15.     import spark.implicits._
  16.     //2.加载数据
  17.     //val rdd: RDD[String] = sc.textFile("data/input/words.txt")//可以使用该方式,然后使用昨天的知识将rdd转为df/ds
  18.     val df: DataFrame = spark.read.text("data/input/words.txt")
  19.     val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")
  20.     //df.show()//查看分布式表数据
  21.     //ds.show()//查看分布式表数据
  22.     
  23.     //3.做WordCount
  24.     //切割
  25.     //df.flatMap(_.split(" ")) //注意:直接这样写报错!因为df没有泛型,不知道_是String!
  26.     //df.flatMap(row=>row.getAs[String]("value").split(" "))
  27.     val wordsDS: Dataset[String] = ds.flatMap(_.split(" "))
  28.     //wordsDS.show()
  29.     
  30.     //使用SQL风格做WordCount
  31.     wordsDS.createOrReplaceTempView("t_words")
  32.     val sql:String =
  33.       """
  34.         |select value,count(*) as count
  35.         |from t_words
  36.         |group by value
  37.         |order by count desc
  38.         |""".stripMargin
  39.     spark.sql(sql).show()
  40.     //使用DSL风格做WordCount
  41.     wordsDS
  42.       .groupBy("value")
  43.       .count()
  44.       .orderBy($"count".desc)
  45.       .show()
  46.     
  47.     /*
  48.     +-----+-----+
  49.     |value|count|
  50.     +-----+-----+
  51.     |hello|    4|
  52.     |  her|    3|
  53.     |  you|    2|
  54.     |   me|    1|
  55.     +-----+-----+
  56.     
  57.     +-----+-----+
  58.     |value|count|
  59.     +-----+-----+
  60.     |hello|    4|
  61.     |  her|    3|
  62.     |  you|    2|
  63.     |   me|    1|
  64.     +-----+-----+
  65.      */
  66.   }
  67. }

 

无论使用DSL还是SQL编程方式,底层转换为RDD操作都是一样,性能一致,查看WEB UI监控中Job运行对应的DAG图如下:

 

从上述的案例可以发现将数据封装到Dataset/DataFrame中,进行处理分析,更加方便简洁,这就是Spark框架中针对结构化数据处理模:Spark SQL模块

官方文档:http://spark.apache.org/sql/

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/115793854

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。