Spark SQL编程

举报
Smy1121 发表于 2019/06/22 16:19:58 2019/06/22
【摘要】 Spark SQL编程

Spark SQL编程

SparkSQL的依赖


不带Hive支持

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.10</artifactId>

<version>1.6.2</version>

</dependency>


带Hive支持(推荐使用)

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-hive_2.10</artifactId>

<version>1.6.2</version>

</dependency>



SparkSQL的入口:SQLContex


SQLContext是SparkSQL的入口

val sc: SparkContext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._ //导?入各种sql操作的?口与各种隐式转换



SparkSQL的入口: HiveContext


HiveContext是SQLContext的子类,提供了对Hive的支持

complete HiveQL parser,

access to Hive UDFs

the ability to read data from Hive tables


编译时要包含Hive支持

mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -Phive -Phive-thriftserver -DskipTests clean package


不需要提前安装Hive(连接已有Hive会在后面讲解)


HiveContext可以使用任何在SQLContext上可用的data source


   

SQLContext vs HiveContext


SQLContext现在只支持SQL语法解析器(SQL-92语法)

val sc: SparkContext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._ //导⼊入各种sql操作的⼊口与各种隐式转换

SQLContext vs HiveContext


HiveContext现在支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行HiveSQL不支持的语法。


使用HiveContext可以使用Hive的UDF,读写Hive表数据等Hive操作。SQLContext不可以对Hive进行操作


Spark SQL未来的版本会不断丰富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最终可能两者会统一成一个Context


HiveContext包装了Hive的依赖包,把HiveContext单独拿出来,可以在部署基本的Spark的时候就不需要Hive的依赖包,需要使用HiveContext时再把Hive的各种依赖包加进来。



Spark SQL的作用与使用方式:

image.png


Spark程序中使用SparkSQL


轻松读取数据并使用SQL 查询,同时还能把这一过程和普通的Python/Java/Scala 程序代码结合在一起


CLI---Spark SQL shell

JDBC/ODBC

各种支持jdbc的软件、商业智能(BI)工具、平台



Spark SQL支持的API


SQL

DataFrame(推荐方式,也能执行SQL)

Dataset(还在发展)

image.png


SQL


支持basic SQL syntax/HiveQL

程序中使用SQL会返回DataFrame

command-line和JDBC/ODBC中均可以使用


   

SparkSQL数据源:从各种数据源创建DataFrame


因为 spark sql,dataframe,datasets 都是共用 spark sql 这个库的,三者共享同样的代码优化,生成以及执行流程,所以 sql,dataframe,datasets 的入口都是 sqlContext。


可用于创建 spark dataframe 的数据源有很多:

image.png


SparkSQL数据源:RDD


val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._


// Define the schema using a case class.

case class Person(name: String, age: Int)


// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()


//

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))

sqlContext.createDataFrame(people)



SparkSQL数据源:Hive

当从Hive 中读取数据时,Spark SQL 支持任何Hive 支持的存储格式(SerDe),包括文件、RCFiles、ORC、Parquet、Avro,以及Protocol Buffer(当然Spark SQL也可以直接读取这些文件)。


要连接已部署好的Hive,需要拷贝hive-site.xml、core-site.xml、hdfs-site.xml到Spark 的./conf/ 目录下即可

如果不想连接到已有的hive,可以什么都不做直接使用HiveContext:

Spark SQL 会在当前的工作目录中创建出自己的Hive 元数据仓库,叫作metastore_db


如果你尝试使用HiveQL 中的CREATE TABLE(并非CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的/user/hive/warehouse 目录中(如果你的classpath 中有配好的hdfs-site.xml,默认的文件系统就是HDFS,否则就是本地文件系统)。



SparkSQL数据源:Hive读写

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")


// Queries are expressed in HiveQL

sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)



SparkSQL数据源:访问不同版本的metastore

从Spark1.4开始,Spark SQL可以通过修改配置去查询不同版本的?Hive metastores(不用重新编译)

image.png



SparkSQL数据源:Parquet

Parquet(http://parquet.apache.org/)是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。

Parquet 格式经常在Hadoop 生态圈中被使用,它也支持Spark SQL 的全部数据类型。Spark SQL 提供了直接读取和存储Parquet 格式文件的方法。


val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._


// Define the schema using a case class.

case class Person(name: String, age: Int)


// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()


people.write.parquet("xxxx")

val parquetFile = sqlContext.read.parquet("people.parquet")


//Parquet files can also be registered as tables and then used in SQL statements.

 parquetFile.registerTempTable("parquetFile")


val agers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

agers.map(t => "Name: " + t(0)).collect().foreach(println)



SparkSQL数据源:Parquet-- Partition Discovery


在Hive中通常会用分区表来优化性能,比如:

image.png


SQLContext.read.parquet或者SQLContext.read.load只需要指定path/to/table,SparkSQL会自动从路径中提取分区信息,返回的DataFrame 的schema 将是:

image.png


当然你可以使用Hive读取方式:

hiveContext.sql("FROM src SELECT key, value").



SparkSQL数据源:Json

SparkSQL支持从Json文件或者Json格式的RDD读取数据

val sqlContext = new org.apache.spark.sql.SQLContext(sc)


// 可以是目录或者文件夹

val path = "examples/src/main/resources/people.json"

val people = sqlContext.read.json(path)


// The inferred schema can be visualized using the printSchema() method.

people.printSchema()


// Register this DataFrame as a table.

people.registerTempTable("people")


// SQL statements can be run by using the sql methods provided by sqlContext.

val agers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")


// Alternatively, a DataFrame can be created for a JSON dataset represented by

// an RDD[String] storing one JSON object per string.

val anotherPeopleRDD = sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

val anotherPeople = sqlContext.read.json(anotherPeopleRDD)



SparkSQL数据源:JDBC

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:postgresql:dbserver","dbtable" -> "schema.tablename")).load()

   

支持的参数:

image.png

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。