Spark之【SparkSQL编程】系列(No1)——《SparkSession与DataFrame》
【摘要】
上一篇博客已经为大家介绍完了SparkSQL的基本概念以及其提供的两个编程抽象:DataFrame和Data...
上一篇博客已经为大家介绍完了SparkSQL的基本概念以及其提供的两个编程抽象:DataFrame和DataSet,本篇博客,博主要为大家介绍的是关于SparkSQL编程的内容。考虑到内容比较繁琐,故分成了一个系列博客。本篇作为该系列的第一篇博客,为大家介绍的是SparkSession与DataFrame。
码字不易,先赞后看,养成习惯!
SparkSQL编程
1. SparkSession
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
2. DataFrame
2.1 创建
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。
在正式开始之前,我们需要准备数据源。
vim /opt/data/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
将其上传到集群上。
hadoop fs -put /opt/data/people.json /input
ok~
1) 从Spark数据源进行创建
(1) 查看Spark数据源进行创建的文件格式,
spark.read.
按tab键表示显示:
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
(2)读取json文件创建DataFrame
注意:spark.read.load
默认获取parquet
格式文件
scala> val df = spark.read.json("/input/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
(3)展示结果
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
2)从RDD中转换
参照第2.5节的内容:DateFrame 转换为RDD
3) 从Hive Table进行查询返回
这个将在后面的博文中涉及到,这里暂且不谈。
2.2 SQL风格语法 (主要)
1)创建一个DataFrame
scala> val df = spark.read.json("/input/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)对DataFrame创建一个临时表
scala> df.createOrReplaceTempView("people")
3)通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
4)结果展示
scala> sqlDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
注意:临时表是Session范围的,Session退出后,表就失效了。如果想应用范围内仍有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp:people。
全局的临时视图存在于系统数据库 global_temp中,我们必须加上库名去引用它
5)对于DataFrame创建一个全局表
scala> df.createGlobalTempView("people")
6)通过SQL语句实现查询全表
scala> spark.sql("select * from global_temp.people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
3. DSL 风格语法 (次要)
1)创建一个DataFrame
scala> val df = spark.read.json("/input/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)查看DataFrame的Schema信息
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
3)只查看"name"列数据
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
4)查看"name"列数据以及"age+1"数据
scala> df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
5)查看"age"大于"21"的数据
scala> df.filter($"age" > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
6)按照"age"分组,查看数据条数
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
2.4 RDD转换为DateFrame
注意:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._
【spark不是包名,而是sparkSession对象的名称】
准备工作:
数据文件people.txt
vim /opt/data/people.txt
zhangsan,17
lisi,20,
wangwu,19上传至hdfs集群
hdfs dfs -put /opt/data/people.txt /input
前置条件: 导入隐式转换并创建一个RDD
scala> import spark.implicits._
import spark.implicits._
scala> val peopleRDD = sc.textFile("/input/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
1)通过手动确定转换
scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int]
2)通过反射确定(需要用到样例类)
<1>创建一个样例类
scala> case class People(name:String, age:Int)
<2>根据样例类将RDD转换为DataFrame
scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF
res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
3)通过编程的方式(了解)
<1>导入所需的类型
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
<2>创建Schema
scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
<3>导入所需的类型
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
<4>根据给定的类型创建二元组RDD
scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
<5>根据数据及给定的schema创建DataFrame
scala> val dataFrame = spark.createDataFrame(data, structType)
dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
2.5 DateFrame 转换为RDD
直接调用rdd即可。
1) 创建一个DataFrame
scala> val df = spark.read.json("/input/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)将DataFrame转换为RDD
scala> val dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29
3)打印RDD
scala> dfToRDD.collect
res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])
好了,本次的分享就到这里。下一篇博客将为大家带来DataSet
的内容,敬请期待!!!
文章来源: alice.blog.csdn.net,作者:大数据梦想家,版权归原作者所有,如需转载,请联系作者。
原文链接:alice.blog.csdn.net/article/details/104588268
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)