Spark基础学习笔记24:Spark SQL数据源

举报
howard2005 发表于 2022/05/03 23:35:16 2022/05/03
【摘要】 文章目录 零、本讲学习目标一、基本操作(一)默认数据源1、默认数据源Parquet2、案例演示读取Parquet文件(1)在Spark Shell中演示(2)通过Scala程序演示 ...

零、本讲学习目标

  1. 学会使用默认数据源
  2. 学会手动指定数据源
  3. 理解数据写入模式
  4. 掌握分区自动推断

Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用相关转换算子进行操作,也可以用于创建临时视图。将DataFrame注册为临时视图可以对其中的数据使用SQL查询。

一、基本操作

  • Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入指定的数据源。

(一)默认数据源

1、默认数据源Parquet

  • 默认情况下,load()方法和save()方法只支持Parquet格式的文件,Parquet文件是以二进制方式存储数据的,因此不可以直接读取,文件中包括该文件的实际数据和Schema信息,也可以在配置文件中通过参数spark.sql.sources.default对默认文件格式进行更改。Spark SQL可以很容易地读取Parquet文件并将其数据转为DataFrame数据集。

2、案例演示读取Parquet文件

  • 将数据文件users.parquet上传到master虚拟机/home
    在这里插入图片描述
  • 将数据文件users.parquet上传到HDFS的/input目录
    在这里插入图片描述

(1)在Spark Shell中演示

  • 启动Spark Shell
    在这里插入图片描述
  • 加载parquet文件,返回数据帧
  • 执行命令:val userdf = spark.read.load("hdfs://master:9000/input/users.parquet")
    在这里插入图片描述
  • 执行命令:userdf.show(),查看数据帧内容
    在这里插入图片描述
  • 执行命令:userdf.select("name", "favorite_color").write.save("hdfs://master:9000/result"),对数据帧指定列进行查询,查询结果依然是数据帧,然后通过save()方法写入HDFS指定目录
    在这里插入图片描述
  • 查看HDFS上的输出结果
    在这里插入图片描述
  • 除了使用select()方法查询外,也可以使用SparkSession对象的sql()方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。
  • 基于数据帧创建临时视图,执行命令:userdf.createTempView("t_user")在这里插入图片描述
  • 执行SQL查询,将结果写入HDFS,执行命令:spark.sql("select name, favorite_color from t_user").write.save("hdfs://master:9000/result2")
    在这里插入图片描述
  • 查看HDFS上的输出结果
    在这里插入图片描述

(2)通过Scala程序演示

  • 创建Maven项目 - SparkSQLDemo
    在这里插入图片描述
  • 在pom.xml文件里添加依赖与插件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.hw.sparksql</groupId>
    <artifactId>SparkSQLDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 创建net.hw.sparksql包,在包里创建ReadParquet对象
    在这里插入图片描述
package net.hw.sparksql

import org.apache.spark.sql.SparkSession

/**
 * 功能:Parquet数据源
 * 作者:华卫
 * 日期:2022年05月01日
 */
object ReadParquet {
  def main(args: Array[String]): Unit = {
    // 本地调试必须设置,否则会报Permission Denied错误
    System.setProperty("HADOOP_USER_NAME", "root")
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("ReadParquet")
      .master("local[*]")
      .getOrCreate()
    // 加载parquet文件,返回数据帧
    val usersdf = spark.read.load("hdfs://master:9000/input/users.parquet")
    // 显示数据帧内容
    usersdf.show()
    // 查询DataFrame中指定列,结果写入HDFS
    usersdf.select("name","favorite_color")
      .write.save("hdfs://master:9000/result3")
  }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 运行程序,查看控制台结果
    在这里插入图片描述
  • 在HDFS查看输出结果
    在这里插入图片描述

(二)手动指定数据源

1、format()与option()方法概述

  • 使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(JSON、Parquet、JDBC、ORC、Libsvm、CSV、Text)。
  • 通过手动指定数据源,可以将DataFrame数据集保存为不同的文件格式或者在不同的文件格式之间转换。
  • 在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数。

2、案例演示

(1)读取房源csv文件

  • 查看HDFS上/input目录里的house.csv文件
    在这里插入图片描述
  • 在spark shell里,执行命令:val house_csv_df = spark.read.format("csv").load("hdfs://master:9000/input/house.csv"),读取房源csv文件,得到房源数据帧
    在这里插入图片描述
  • 执行命令:house_csv_df.show(),查看房源数据帧内容
    在这里插入图片描述
  • 大家可以看到,house.csv文件第一行是字段名列表,但是转成数据帧之后,却成了第一条记录,这样显然是不合理的,怎么办呢?就需要用到option()方法来传递参数,告诉Spark第一行是表头header,而不是表记录。
  • 执行命令:val house_csv_df = spark.read.format("csv").option("header", "true").load("hdfs://master:9000/input/house.csv")
    在这里插入图片描述
  • 执行命令:house_csv_df.show(),查看房源数据帧内容
    在这里插入图片描述

(2)格式转换 - 读取json,保存为parquet

文章来源: howard2005.blog.csdn.net,作者:howard2005,版权归原作者所有,如需转载,请联系作者。

原文链接:howard2005.blog.csdn.net/article/details/124350981

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。