大数据技术之RDD的概述

举报
tea_year 发表于 2024/06/10 18:06:27 2024/06/10
【摘要】 一、RDD的基本概念接下来我们来学习RDD的一些核心概念:RDD是Spark中最重要的概念之一,全称叫弹性分布式数据集,我们可以简单的把它理解为提供了很多操作接口的数据集合。但是它跟我们一般使用的数据集不同的点在哪里呢?比如平时我们用Python或java开发程序时,操作的数据集合都是存放在我们单台的计算机上的,但是RDD实际上是把数据以分布式的方式存储在一批机器的内存或磁盘当中,这个概念跟...



一、RDD的基本概念

接下来我们来学习RDD的一些核心概念:

RDD是Spark中最重要的概念之一,全称叫弹性分布式数据集,我们可以简单的把它理解为提供了很多操作接口的数据集合。但是它跟我们一般使用的数据集不同的点在哪里呢?比如平时我们用Python或java开发程序时,操作的数据集合都是存放在我们单台的计算机上的,但是RDD实际上是把数据以分布式的方式存储在一批机器的内存或磁盘当中,这个概念跟HDFS中的文件块是很类似的。我们可以结合下图去理解:



比如我们定义了一个叫myRDD的RDD,那么实际上可以看到我们在操作的时候是针对这个myRDD去调用方法进行处理的,但实际上这个数据集在存储时,有可能被拆分在多个分区多个partition去进行存放的,那么这个partition跟HDFS中的文件块是不是就很类似了?!那么每个分区实际上都可能被存放在不同机器的内存或磁盘上。这个是可以根据实际的业务场景进行调整的。

那为什么会管它叫弹性分布式数据集呢?分布式已经讲了,就是因为它在存储层面上可以被存放在不同的节点上的;但是这当中的弹性又从何体现呢?我们可以结合RDD的特点给大家进行说明:

1. RDD在进行使用的时候,一旦我们把它创建出来了比如myRDD,就不能再去对这个RDD对象进行改变了;可能有人会说它不是提供了一些转换的方法吗?确实如此,但是要知道我们针对RDD调用了一些转换方法之后呢,生成了一些新的RDD,这个新RDD就是一个全新的RDD了,它是不会针对原有的RDD进行改变的;

所以我们看到第二条说:

2. 可以通过并行转换的方式来创建RDD,而不是改变RDD

3. RDD在进行计算的过程当中,假如说任务计算失败了,它是可以被自动重建的,不需要我们再从头到尾一步步把之前的操作再自己做一遍,这个就归功于spark的DAG图了

4. RDD的存储级别是可以去调整的,并没有限制死说数据必须存放在磁盘中,或者说必须存放在内存中。这里可以指定存储级别,来去对数据进行一个复用。那假如说内存资源有限的话呢,它会自动的把存储级别降为磁盘存储,虽然说这种情况下,整个的运算速度会相对降低的比较多,但也不会比MapReduce更慢了。

5. 最后一点其实也是在说明RDD失败可以自动重建的特点,假如节点出现了故障,数据丢失了,RDD是可以通过DAG图当中记录的血缘关系也就是它的lineage(血统、谱系)去溯源重新计算出来这个分区的数据,所以总的说来RDD的弹性就主要体现在它的存储弹性跟它的容错弹性上。


接下来看,这是RDD的一部分算子,或者说方法:

这里简单给大家看一眼,后续我们进行详细解读:

这里可以看到RDD的算子会分为两种类型:

第一种叫做转换算子——Transformations:比如map或filter这些,转换类型的算子有什么特点呢?在于它们的执行是有惰性的,比如咱们针对myRDD调用了一个map方法去进行处理,在我们把代码提交之后,后台是不是马上会帮我们去做转换形成一个新的RDD呢?其实并不会,spark在碰到Transformation这个类型的算子时,它首先会把当前的操作给记录下来,知道说我要对这个RDD做这步转换了,但它不会马上帮你去提交到Executor去做这个计算,直到它碰到了下面这些Actions类型的算子,也就是行动类型的这样一些操作的时候,它才会把前面一系列的针对数据的一些处理去给你跑出来。

可能有人会说这也太复杂了,我学个spark还要特意去记哪些方法是有惰性的,哪些方法是可以立刻执行的,但是,其实行动算子是有一定的规律的,一般来说假如是将计算结果进行返回,或者说写入到存储系统中的,一般就属于是行动算子。


接下来给大家讲几个概念方便大家去理解spark的方法或DAG图构建的一些原理:

DAG图本质上是在记录说:当前我们去做处理的这些数据,它要怎么去计算速度会更快、效率会更高,而计算本质上就是涉及到前面我们所说的这样一些算子、方法的调用,所以接下来我们给大家讲的概念,大家可以结合存储层面上的情况还有方法的效果,去进行思考:

(这里第一次听可能会不太好消化,大家可以尽量去结合这两页的PPT进行理解,后续讲到RDD的算子、查看过WEBUI上的DAG图之后呢,大家应该就能想通了)

这里先介绍两个概念——宽依赖 & 窄依赖

首先,窄依赖是指子RDD的分区只依赖于每一个父RDD的单个分区。

那么什么叫子RDD,什么叫父RDD呢?我们可以结合下面这个图去理解:

这里每一个蓝色的框都表示一个RDD,里面蓝色的小方块就表示RDD里的分区。后面的这一个RDD是从前面去衍生出来的,所以后面的这个新的就叫子RDD,它的来源叫父RDD。Map filter这个是它只来源于单个的父RDD,join这个是有两个父RDD的。

搞清楚子RDD和父RDD之后,我们再来看窄依赖的概念:

刚说了,什么叫窄依赖呢?就是子RDD的分区只依赖于每一个父RDD的单个分区。它又可以衍生出上面所说的这两种情况:一个父RDD的分区对应于一个子RDD的分区或者多个父RDD的分区对应于一个子RDD的分区。

宽依赖:表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。光听这些概念感觉很复杂,但其实也很简单。看上图:

左侧三个都属于是窄依赖,右侧这两个属于宽依赖。大家先不用懂这些方法起到什么作用,只要先看到这些图示都是按照方法的作用去画的。你只要先从图里去找到它们连接的关系或规律就可以了。

那么我们去观察左右这两个图可以发现什么呢?可以看到右侧两图它们的连线是更复杂的,这些连线说明了子RDD的每一个分区是来源于哪一些分区的。那就很明显,宽依赖这边的情况就是子RDD中的每一个分区它都需要从父RDD的多个分区里去找数据过来,所以它的连线是更加复杂的。而窄依赖就简单很多了,它的每一个子RDD分区都只需要从父RDD单个分区去找一遍就可以了。不存在那种要找同一个爸爸好多次那种情况。

那么宽窄依赖跟DAG图有啥关系呢?我们往下看:

我们来看上面这个简单的DAG图,这里先介绍一下这个图是怎样的:

这里每个长条的框都表示一个RDD,最左侧这个就相当于是读取进来的两份数据,形成了两个RDD。然后这整个DAG描述的就是基于这两个RDD所做的一系列处理,最终形成了最右边这样一个RDD的过程。

大家先观察下图,然后来思考两个问题:

1. 它是怎么来划分stage的

2. 划分好stage,它对于我们task小任务的运行又有什么样的好处?

可以看到在最左侧,针对左上方的RDD做的处理都被单独划分成了一个stage0,下面这个针对下面这个RDD所做的处理都被划分成了单独的stage1,右边这一大块呢,也被划分成了单独的stage2。那为什么这么去划分?它是怎么划分的呢?

假如没有看出来的话,可以翻回上一页看一下:很明显它是针对RDD之间的依赖关系去划分stage的,只要它碰到了宽依赖,它就断开,把前面的一系列的窄依赖划分成一个stage。

它采用这样一个划分方式有什么好处呢?

这就跟task有关系了,大家可以想一下,这里每一个stage中,最终的子RDD每一个分区都是从单个分区中得到的,左侧这两个stage里的RDD应该没问题吧;右侧这个stage,最右侧结果RDD里面的每个分区,往左溯源,可以看到都只需要从单个父RDD的单个分区就可以得到了,就算它有两个父RDD,它也是分别从里面的某一个分区里把数据提取出来,形成这一个分区就可以了。

所以说在这种情况下,spark就可以把每个分区的计算单划分成一个task,然后将若干个task同时交由Executor去做并行的计算,这个时候就可以同时把这四个分区的运算就都做完了?这样就提高了整体的计算效率。这个其实就是DAG图的意义。

另外,从这个图中也可以看出,假如其中某个节点宕机,导致某个分区数据丢失,就可以根据DAG图当中的记录去溯源,找回说我要找回这个丢失的分区数据,它前面要找到哪个父RDD去计算就可以了。那么这个溯源它根据的就是这个分区它的lineage。


二、创建RDD

从这里开始呢,就进入spark编程的内容了。RDD的创建方法大体有三种:

1. 从集合中创建RDD:利用现有的数据集合创建

2. 从外部存储创建RDD:从外部数据文件创建RDD     —— 文本文件、JSON、CSV文件

3. 从其他RDD创建:利用现有的RDD去生成转换,涉及到RDD算子的使用


先搁置3,依次来看1和2:

1. 从集合中创建RDD:parallelize()函数

语法格式:parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]

Parallelize Rdd默认分区数:sc.defaultParallelism,可通过spark.default.parallelism设置sc.defaultParallelism的值,没有配置spark.default.parallelism时的默认值等于cpu的核数

scala> val list=List("a","b","c","d")

scala> val rdd1=sc.parallelize(list)

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24

这里的返回信息告诉我们,当前已经创建了一个rdd了,它里面存放的是一些字符串数据。

有了这么一个RDD之后,就可以查看一下这个RDD当前的分区情况:

scala> rdd1.partitions.size

res8: Int = 2

这里看到rdd1当前的分区数是2,那为什么是2呢?这个默认值其实是根据当前集群中计算节点的cpu核数来定的,如果我们忘了集群参数配置的话,其实可以看一下WEBUI——hadoop102:8085:

这里看到集群有两个计算节点worker,它们分别都是1核的,加起来就是2核,所以集群RDD默认都是分2个区存储的。

那么假如我们想去修改这个rdd1的分区个数,如何操作呢?

第一个方式:就是使用parallelize()去创建rdd时,直接通过它后边的参数去指定分区个数:

scala> val rdd2=sc.parallelize(list,3)

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24

再来查看分区数,就变成了3个分区:

scala> rdd2.partitions.size

res1: Int = 3

Spark会为每个分区运行一个Task任务处理,Spark建议用户,为集群中的每个CPU创建2-4个分区。


第二种方式:直接修改默认分区数

获取当前sc配置,利用set方法去设置新的spark的缺省分区参数

scala> val new_conf=sc.getConf.set("spark.default.parallelism","3")



因为一次只能有一个sc环境参数,所以必须先stop关闭当前sc

scala> sc.stop



导入SparkContext,重新创建一个带新参数的sc对象

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala> val sc=new SparkContext(new_conf)
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6fe04d9d
scala> val rdd3=sc.parallelize(list)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> rdd3.partitions.size
res9: Int = 3


练习:把默认分区数改回成2个:

scala> val new_conf=sc.getConf.set("spark.default.parallelism","2")
scala> sc.stop


scala> import org.apache.spark.SparkContext

import org.apache.spark.SparkContext


scala> val sc=new SparkContext(new_conf)

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6fe04d9d


在Spark编程模式下,Spark还提供了makeRDD()方法来创建RDD,其使用与parallelize()类似,区别在于它会根据数据集合对象的不同创建最佳分区。如:

scala> val seq=List(("a",List("a1","a2","a3")),("b",List("b1","b2")),("c",List("c1")))

scala> val rdd4=sc.parallelize(seq)

scala> rdd4.partitions.size

res8: Int = 2    

scala> val rdd5=sc.makeRDD(seq)

scala> rdd5.partitions.size

res9: Int = 3


2. 从外部存储创建RDD:第二种方法是使用textFile直接加载数据文件为RDD

2.1 使用文本文件创建RDD

textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

例如,我们Linux本地创建一个aa.txt数据文件,我们如何将其读取成一个RDD呢?用textFile()方法

[syf@hadoop102 spark-3.2.1]$ vim a.txt

hello spark

hello scala spark

:
scala> val localdata=sc.textFile("file:///opt/module/spark-3.2.1/aa.txt")

localdata: org.apache.spark.rdd.RDD[String] = file:///opt/module/spark-3.2.1/aa.txt MapPartitionsRDD[4] at textFile at <console>:26


这里看似这个外部文件已经被成功读取,但是当我们去调用take方法查看它的第1行时,会发现:报错!

scala> localdata.take(1)

23/03/27 12:26:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1) (192.168.200.104 executor 0): java.io.FileNotFoundException: File file:/opt/module/spark-3.2.1/aa.txt does not exist


文件不存在是为什么呢?因为spark是做一个分布式计算,假如想让集群从本地文件系统中读取某一份文件进来进行RDD计算的时候,这里就得保证说做计算的这部分节点上、或者说集群上同样路径下都要有同样的数据源文件才可以。所以我们这里通过xsync将aa.txt文件同步到集群其它机器方可。

scala> localdata.take(1)

res9: Array[String] = Array(hello spark)

这样就可以拿到数据了

所以注意1:如果读取的是本地文件系统的数据的话,要注意集群中的数据同步情况。

注意2:在没有同步文件之前,当我们敲击回车读取代码时,没有报错。即使我们改一个不存在的数据文件名,也不会报错,还会返回数据类型RDD[String]:

scala> val localdata=sc.textFile("file:///opt/module/spark-3.2.1/bb.txt")

localdata: org.apache.spark.rdd.RDD[String] = file:///opt/module/spark-3.2.1/bb.txt MapPartitionsRDD[8] at textFile at <console>:26


但是,当我们真正去使用localdata调用take()方法去查看数据时,就报错了,提示没有这份数据。原因就是我们之前提到过的惰性运行的概念,只有在碰到take 保存、查看这样的操作时,它才去真正运行之前的数据处理,发现错误并报出。


刚刚是从本地文件系统中去读取数据,需要在集群中同步数据。但如果是在hdfs分布式文件系统上去读取数据文件的话,就没这么麻烦了。只要确保hdfs这个路径下确实存在该文件,只要把路径写清楚即可,还不需要加前缀:

上传文件:

[syf@hadoop102 spark-3.2.1]$ hdfs dfs -put aa.txt /
scala> val hdfsdata=sc.textFile("/aa.txt")

hdfsdata: org.apache.spark.rdd.RDD[String] = /aa.txt MapPartitionsRDD[1] at textFile at <console>:23

scala> hdfsdata.take(5)

res0: Array[String] = Array(hello spark, hello scala spark)
//统计aa.txtWENJIAN 包含的字符个数

scala>hdfsdata.map(line=>line.length).reduce(_+_)

res12: Int = 36

2.2使用JSON文件创建RDD

从文件内容可以看出每个{...}结构都保存了一个对象,即一条记录,这个JSON文件包含了若干对象

读取JSON文件创建RDD最简单的方法是将JSON文件当做文本文件,使用textFile()读取

方法1:创建student.json文件:

{"学号":"106","姓名":"李明","数据结构":"92"}

{"学号":"242","姓名":"李乐","数据结构":"96"}

{"学号":"107","姓名":"冯涛","数据结构":"84"}


scala> val jsonStr=sc.textFile("file:///opt/module/spark-3.2.1/student.json")
jsonStr: org.apache.spark.rdd.RDD[String] = file:///opt/module/spark-3.2.1/student.json MapPartitionsRDD[1] at textFile at <console>:23
scala> jsonStr.collect


方法2:使用样例类解析JSON中的对象,即将JSON中的对象读入含有数据结构的样例类中。然后根据样例类的格式解析JSON中的对象:

//隐式转换必须要导入:

scala> import org.json4s._
import org.json4s._
scala> import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.JsonMethods._
scala> case class Student(学号:String, 姓名:String, 数据结构:String)
defined class Student
//定义隐式参数formats,它是parse和extract方法转换数据所依赖的参数
scala> implicit val formats = DefaultFormats
formats: org.json4s.DefaultFormats.type = org.json4s.DefaultFormats$@78227f53
scala> val format_json = jsonRDD.collect.map{x=>parse(x).extract[Student]}
format_json: Array[Student] = Array(Student(106,李明,92), Student(242,李乐,96), Student(107,冯涛,84))

scala> format_json.foreach(println)

Student(106,李明,92)

Student(242,李乐,96)

Student(107,冯涛,84)



2.3 使用CSV文件创建RDD

scala> val gradeRDD=sc.textFile("file:///opt/module/spark-3.2.1/grade.csv")
gradeRDD: org.apache.spark.rdd.RDD[String] = file:///opt/module/spark-3.2.1/grade.csv MapPartitionsRDD[9] at textFile at <console>:46
scala> val result =gradeRDD.map{line=>val ls=line.split(",");(ls(0),ls(1),ls(2))}
result: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[10] at map at <console>:46
scala> result.collect.foreach(println)
(101,Jack,95)
(102,Mary,90)
(103,Tom,96)


或者使用系统提供的解析方法:

scala> import java.io.StringReader
import java.io.StringReader
scala> import au.com.bytecode.opencsv.CSVReader
import au.com.bytecode.opencsv.CSVReader
scala> val result2=gradeRDD.map{line=>val reader =new CSVReader(new StringReader(line));reader.readNext()}
result2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[11] at map at <console>:48
scala> result2.collect
res16: Array[Array[String]] = Array(Array(101, Jack, 95), Array(102, Mary, 90), Array(103, Tom, 96))



练习1、将三份数据文件student.txt(学生信息表),result_bigdata.txt(大数据基础成绩表),result_math.txt(数学成绩表)上传至hdfs的/rdd_data目录下,然后启动spark-shell:

加载student.txt为名称为student的RDD数据,result_bigdata.txt为名称为bigdata的RDD数据,result_math.txt为名称为math的RDD数据。

使用take方法查看每个RDD数据内容。


三、RDD算子

下面我们给大家讲解Transformation类型的算子:

(一)映射操作

1. map:是在前面的课程中出镜率比较高的一个方法,那么它的功能到底是什么呢?

使用场景一:帮我们把现有的RDD的每个数据项通过给进map中的用户自定义函数f去进行处理,生成一个新的数据项,很明显这个就是一个一一映射的操作,所以map是不会改变当前RDD的分区数目。

它所需要的参数就是我们针对数据项的一个自定义函数。

来看一个简单的例子:

例如现在有一个RDD:里面存放了(1,2,3,4),咱们想让其中的每一个数都做一个求平方的操作,怎么完成呢?

例1:已知一个数据集合(1,2,3,4),如何对其中每一个数做求平方的操作?

scala> val data= sc.parallelize(List(1,2,3,4))        //先把这个RDD构建出来

data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23


RDD构建完毕后,可以先通过collect去查看一下当前RDD中的每个元素。

scala> data.collect        //collect是把RDD当中的全部元素都返回回来作为一个数组。

res0: Array[Int] = Array(1, 2, 3, 4)
scala> data.take(3)        //take是按照我们指定的数量给我们把元素返回回来,比如返回3条(按照数组的格式返回具体3条元素)

res1: Array[Int] = Array(1, 2, 3)


那么如何对其中的每一个值对其求平方呢?

scala> val square=data.map(x=>x*x)    //针对data调用map,再对其中的每一个数据项X做求平方的操作。

square: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23


再用collect去查看squrare的每一个值。


使用场景二:使用map算子将一个普通的RDD,转换为一个键值对的RDD,后续可供只能提供“键值对”类型RDD的算子操作使用。

例2:对一个由英文单词组成的文本行("Who is her", "What r u doing", "Here you are"),提取其中的第一个单词作为key,将整个句子作为value,建立“键值对”类型的RDD ——

((“Who”, “Who is her”), (“What”, “What r u doing”), (“Here”, “Here you are”)


scala> val wordsRDD = sc.parallelize(List("Who is her", "What r u doing", "Here you are"))
wordsRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:23
scala> val pairRDD = wordsRDD.map(x=>(x.split(" ")(0),x))
pairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:23
scala> pairRDD.collect.foreach(println)

(Who,Who is her)
(What,What r u doing)
(Here,Here you are)



2. flatMap:也是一个Transformation类型的算子,它跟map特别像:对集合中的每个元素进行map自定义函数处理之后再进行扁平化或者说降维。

例如:flatMap的一个典型使用场景是:将输入的字符串分割为单词

例1:

scala> val data=sc.parallelize(List("I am learning spark","I like spark"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:23
scala> val fmData=data.flatMap(x=>x.split(" "))
fmData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:23
scala> val mapData = fmData.map((_,1))
mapData: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at <console>:23
scala> mapData.collect

res5: Array[(String, Int)] = Array((I,1), (am,1), (learning,1), (spark,1), (I,1), (like,1), (spark,1))



3. mapPartitions:看起来跟前两个是近亲,但是它还是跟前两个是不一样的,因为maprPartitions的一个map操作是针对每一个分区、每一个partitions去进行映射的。就相当于之前我们在map的自定义函数中定义的形参x,map它拿到的x就是RDD中的每一个元素,但是mapPartitions拿到的每一个元素就是分区的数据,是一个迭代器。


例1:创建(1 to 10)的RDD,取出每个分区中大于3的值

构建一个RDD,里面的值是1 to 10

scala> val rdd=sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:23


查看一下数据的分区数:

scala> rdd.partitions.size

res5: Int = 2

查看分区数据:

scala> rdd.glom().collect

res8: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10))


在自定义函数里,这是我们接收到的每一个内容是一个分区的数据集合,所以这里为了区分,我们不再用形参x指定它,而是使用iter,那我们针对每个分区的数据都是通过调用filter进行过滤,这里filter过滤方法也是需要给进去一个自定义函数,告诉它我要用一个什么条件进行过滤,那我们这里的判断条件就是x>3

scala> val mpRDD=rdd.mapPartitions(iter=>iter.filter(x=>x>3))

// 这里可以简化为:rdd.mapPartitions(_.filter(_>3))

mpRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at mapPartitions at <console>:23


查看一下,就把数据当中的4 5 6 7 8 9 10返回了

scala> mpRDD.collect

res6: Array[Int] = Array(4, 5, 6, 7, 8, 9, 10)


使用glom().colletc去查看一下:

scala> mpRDD.glom().collect

res7: Array[Array[Int]] = Array(Array(4, 5), Array(6, 7, 8, 9, 10))


这就是mapPartitions的使用方法

一般在做大数据处理的时候,咱们是可以灵活决定去使用map还是mapPartitions,为什么呢?

大家可以想象,假如你的数据这里有n个元素,在你使用map方法进行处理的时候,你这里面的函数就要执行n次对吧,假如你这n个元素存放在50个分区里面,那么在你去通过mapPartitions进行同样的处理的时候,你这里面的函数就只需要被执行50次就可以了,它的性能相对来讲要高一些。

但是呢也是因为你每一次针对每一个分区里的数据量它是更多的,是量更大的,所以它对计算资源的需求也会更高。假如说这个时候超出了你的处理内存,它就会给你报错。

但是如果你用的是map,一般通过自动回收内存处理,就可以避免这样一个问题。

所以这里大家就需要根据实际业务情况去进行一个选择即可。


(二)过滤和去重操作

4. filter:这里是前面已经露过面的filter,它是针对我们数据中的元素进行过滤的,所以这里面给进去的是一个自定义函数,用这个自定义函数来告诉它过滤的规则是怎样的。它就会针对我们RDD中的每一个元素去判断,判断为true的就把它留下来。

例1:针对一个包含数据1,2,3,4四个元素的RDD,去过滤其中小于或等于2的元素:

scala> val data = sc.parallelize(List(1,2,3,4))

data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> data.filter(x=>x>2).collect

res0: Array[Int] = Array(3, 4)


例2:创建4名学生考试数据信息的RDD,使每名学生考试数据信息都包括姓名、考试科目、考试成绩,字符之间用空格连接,然后使用filter算子找出成绩为100分的学生姓名和考试科目。

方法一:

scala> val scores=sc.parallelize(List("张三 Hadoop 85","李四 Python 95","王五 Hadoop 100","赵六 Java 100"))

scores: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:23
scala> val s100 = scores.map(s=>s.split(" ")).filter(arr=>arr(2).toInt==100).map(arr=>(arr(0),arr(1)))

s100: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[26] at map at <console>:23

scala> s100.collect.foreach(println)

(王五,Hadoop)

(赵六,Java)


方法二:

scala> val s100 = scores.map{
| x =>
| val splits = x.split(" ");
| (splits(0),splits(1),splits(2).toInt)
| }.filter(_._3 == 100).map(x => (x._1,x._2))

s100: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[34] at map at <console>:27
scala> s100.collect.foreach(println)

(王五,Hadoop)

(赵六,Java)


5. distinct:它是帮我们去针对RDD中的元素进行去重的,只保留第一次出现的元素。

例:一个RDD(1,2,3,3),对它进行去重后查看:

scala> val data = sc.parallelize(List(1,2,3,3))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:23
scala> data.distinct.collect
res1: Array[Int] = Array(2, 1, 3)


(三)排序操作

6. sortBy:从名字上大家也应该能看出它是干什么的,就是做排序的。

在我们使用这个方法进行排序的时候,需要给进去一些什么参数呢?

第一个参数,也是一个自定义函数,用来指定对于RDD中的哪一部分元素来进行排序;大家可以看到这里面的内容,比如这里给进去一个function,他里面的形参是T,就表示我们RDD中的每一个元素;右边的返回值就表示我们要根据T当中的哪一个值或者哪一部分值去进行排序,这个是我们第一部分这个参数。

第二个参数是:ascending:表示你要不要根据升序进行排序,默认是true升序,要想降序就得改成false

第三个参数是:numPartitions:可以利用它去指定排序后返回的RDD的分区数是多少,假如不指定,这里会默认按照原有的RDD分区数去进行返回。

例1:有一个列表,里面存放的都是二元组List((1,3),(45,3),(7,6)),我们把它转为RDD之后,如何根据其中的一部分值去进行排序呢?

按二元组的第二个值去进行降序排序:

scala> val data =sc.parallelize(List((1,3),(45,3),(7,6)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:23

scala> val sort_data=data.sortBy(x=>x._2,false,1)
sort_data: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[13] at sortBy at <console>:23

scala> sort_data.glom.collect        //这里结合glom去查看它是不是只有一个分区
res9: Array[Array[(Int, Int)]] = Array(Array((7,6), (45,3), (1,3)))


按二元组的第一个值去进行升序排序:

scala> val sort_data=data.sortBy(x=>x._1)
sort_data: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[10] at sortBy at <console>:23


scala> sort_data.glom().collect
res0: Array[Array[(Int, Int)]] = Array(Array((1,3), (7,6)), Array((45,3)))


例2:将之前的filter案例中4名学生考试成绩("张三 Hadoop 85","李四 Python 95","王五 Hadoop 100","赵六 Java 100"),排序后取前三名,显示姓名、科目和成绩。

scala> val s100 = scores.map{x =>val splits = x.split(" ");(splits(0),splits(1),splits(2).toInt)}
s100: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[39] at map at <console>:23

scala> s100.sortBy(_._3,false).take(3).foreach(println)
(王五,Hadoop,100)
(赵六,Java,100)
(李四,Python,95)



接下来来独立完成一个小任务:

任务2:根据任务1得到的RDD bigdata及math,取出成绩排名前5名的学生成绩信息

(1) 先拆分(拆分后直接查看)

scala> bigdata.map(x=>x.split("\t")).take(1)    //拆分后,里面每一个元素也还是字符串

res7: Array[Array[String]] = Array(Array(1001, 大数据基础, 90))


(2) 接下来把它的成绩进行int类型的转换

scala> val bigdata_map=bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(1),x(2).toInt))

bigdata_map: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[22] at map at <console>:23


(3) 根据第三个元素进行排序

scala> val bigdata_sort=bigdata_map.sortBy(x=>x._3,false)

bigdata_sort: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[27] at sortBy at <console>:23


(4) 显示结果

scala> bigdata_sort.take(5)

res8: Array[(String, String, Int)] = Array((1007,大数据基础,100), (1003,大数据基础,100), (1004,大数据基础,99), (1002,大数据基础,94), (1006,大数据基础,94))

大家自己练习针对math成绩进行降序排序,取倒数前3名。

scala> val math=sc.textFile("/rdd_data/result_math.txt")

math: org.apache.spark.rdd.RDD[String] = /rdd_data/result_math.txt MapPartitionsRDD[18] at textFile at <console>:23

scala> val math_map=math.map(x=>x.split("\t")).map(x=>(x(0),x(1),x(2).toInt))

math_map: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[30] at map at <console>:23

scala> val math_sort=math_map.sortBy(x=>x._3)

math_sort: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[35] at sortBy at <console>:23

scala> math_sort.take(3)

res13: Array[(String, String, Int)] = Array((1011,应用数学,79), (1006,应用数学,80), (1009,应用数学,84))


(四)集合操作

7. union:也是之前出镜过的一个算子了,这个方法是帮我们把两个RDD合并起来,因为它有点类似于把我们两张表上下去把它拼接在一起,所以说咱们去针对两个RDD去做union处理的时候呢,是需要保证说两个RDD的元素类型是一致的,并且该算子不会进行去重操作。

例:合并rdd1和rdd2

scala> val rdd1=sc.parallelize(List(1,2,3))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:23


scala> val rdd2=sc.parallelize(List(1,2,4))

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:23


scala> rdd1.union(rdd2).collect    //这里可以看到,它把这两个RDD中的元素直接拼合在一起了。

res3: Array[Int] = Array(1, 2, 3, 1, 2, 4)


scala> rdd2.union(rdd1).collect

res9: Array[Int] = Array(1, 2, 4, 1, 2, 3)


大家也可以去试一下,把rdd1中的元素改成字符串类型,看看能不能拼合成功。

scala> val rdd3=sc.parallelize(List("a","b","c"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:23


scala> rdd3.union(rdd2).collect

<console>:25: error: type mismatch;

found : org.apache.spark.rdd.RDD[Int]

required: org.apache.spark.rdd.RDD[String]

rdd1.union(rdd2).collect


8. intersection:求两个RDD的共同元素,也就相当于是求交集。它的用法和union是一样的。

这里就复用刚才的rdd1和rdd2进行演示:

scala> val rdd1=sc.parallelize(List(1,2,3))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:23


scala> rdd1.intersection(rdd2).collect

res5: Array[Int] = Array(2, 1)


既然有并集有交集,那当然也少不了差集了:

9. substract:再次复用前面的rdd1和rdd2数据:

scala> rdd1.subtract(rdd2).collect

res6: Array[Int] = Array(3)


scala> rdd2.subtract(rdd1).collect

res7: Array[Int] = Array(4)

咱们在使用这些方法的时候,是有一个顺序在里面的,你是针对哪个RDD去调用的union、intersection、subtract,相当于你要拿哪个RDD作为主表去求,不要乱用。求交集可能还好,但是使用union和subtract的时候,针对谁去调用,那一部分值union的话主表就会被放在前边,针对哪个RDD去求差集的话,它就会对比主表和后面的RDD哪一部分是不重复的,保留下来。


10. cartesian /kɑ:ˈti:ʒən/ :笛卡尔积就是将两个集合的元素两两组合成一组。

scala> rdd3.cartesian(rdd1).collect

res6: Array[(String, Int)] = Array((a,1), (a,2), (a,3), (b,1), (c,1), (b,2), (b,3), (c,2), (c,3))


scala> rdd1.cartesian(rdd3).collect

res7: Array[(Int, String)] = Array((1,a), (1,b), (1,c), (2,a), (3,a), (2,b), (2,c), (3,b), (3,c))

任务3:

1. 找出考试成绩得过100分的学生ID,最终的结果需要集合到一个RDD中。

scala>val result=bigdata.union(math).map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt)).filter(x=>x._2==100).map(x=>x._1).distinct

result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[73] at distinct at <console>:24


scala> result.collect

res18: Array[String] = Array(1003, 1007, 1004)


2. 找出两门成绩都得过100分的学生ID,结果汇总为一个RDD。

scala> val bigdata100=bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt)).filter(x=>x._2==100)

bigdata100: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[102] at filter at <console>:23


scala> val math100=math.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt)).filter(x=>x._2==100)

math100: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[105] at filter at <console>:23


scala> bigdata100.collect

res24: Array[(String, Int)] = Array((1003,100), (1007,100))


scala> math100.collect

res25: Array[(String, Int)] = Array((1003,100), (1004,100))


scala> bigdata100.intersection(math100).map(x=>x._1).collect

res26: Array[String] = Array(1003)


错误说明:

scala> val bigdata100=bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt)).filter(x=>x._2==100).collect

scala> val math100=math.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt)).filter(x=>x._2==100).collect

scala> bigdata100.intersection(math100)

提示出错是怎么回事????


(五)键值对RDD操作

前面我们介绍了一些针对RDD的常用操作,基本上这部分操作大多数都是通用的,为什么呢?因为它们主要是针对单值RDD的操作,下面我们来介绍键值对RDD,也叫PairRDD,它是一种特殊的RDD,里面的每一个元素都包含键和值两部分。

Spark专门提供了一些针对PairRDD的算子去使用,通常我们使用PairRDD的常用场景呢,是针对数据进行一些分组啊、聚合啊之类的操作。

我们先来看键值对RDD的创建方式:

1. 使用map函数创建PairRDD

scala> val rdd=sc.parallelize(List("a","b","c","c"))    

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[113] at parallelize at <console>:23


scala> rdd.collect        //rdd由几个字母组成

res27: Array[String] = Array(a, b, c, c)


scala> val rdd2=rdd.map(x=>(x,1))        //使用map构建新的RDD,每个字母当键,1做值

rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[114] at map at <console>:23


scala> rdd2.collect        //这里就整合成了键值对RDD了,每一个元素都是有键有值的

res28: Array[(String, Int)] = Array((a,1), (b,1), (c,1), (c,1))


2. Zip:打包操作创建pairRDD:

scala> val rdd1=sc.parallelize(Array(1,2,3))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[72] at parallelize at <console>:23


scala> val rdd2=sc.parallelize(Array("a","b","c"))

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[73] at parallelize at <console>:23


scala> val zipRDD1=rdd1.zip(rdd2)

zipRDD1: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[74] at zip at <console>:24


scala> val zipRDD2=rdd2.zip(rdd1)

zipRDD2: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[75] at zip at <console>:24


scala> zipRDD1.collect

res44: Array[(Int, String)] = Array((1,a), (2,b), (3,c))


scala> zipRDD2.collect

res45: Array[(String, Int)] = Array((a,1), (b,2), (c,3))


该算子要求两个RDD的分区数量以及每个分区中元素的数量都相同:

scala> val rdd3=sc.parallelize(Array(1,2,3),3)

rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:23


scala> rdd3.collect

res0: Array[Int] = Array(1, 2, 3)


scala> rdd2.zip(rdd3)

res1: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[3] at zip at <console>:25


scala> val zipRDD3 = rdd2.zip(rdd3)

zipRDD3: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[4] at zip at <console>:24


scala> zipRDD3.collect        //报错,原因:分区数量不同

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 3)



3. 那么键值对RDD常用于什么场景呢?最基本的是通过keys和values去提取键值对当中键的那部分和值的那部分出来,用于构建一个新的RDD。

scala> zipRDD2.keys.collect

res29: Array[String] = Array(a, b, c, c)


scala> zipRDD2.values.collect

res30: Array[Int] = Array(1, 1, 1, 1)


scala> rdd2.keys    //不论是键还是值,它的计算结果都是一个RDD类型的数据集合

res31: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[117] at keys at <console>:24


除此之外呢,我们还可以单独针对其中值的部分去进行一个映射处理:

4. mapValues:跟map很类似,区别在于,它不去动键部分的数据,而只针对值进行处理。

scala> zipRDD2.mapValues(x=>(x,1)).collect

res32: Array[(String, (Int, Int))] = Array((a,(1,1)), (b,(1,1)), (c,(1,1)), (c,(1,1)))


5. groupByKey:这个方法是按照键去进行分组的方法,分组后它返回的就不是单独的值了,这里它返回的就是一个迭代器,或者说是一个可迭代对象。

事实上,在我们做完分组之后,要去针对它聚合好的值去进行计算,是需要后续的一些操作的。我们来试一下:

scala> rdd2.groupByKey().collect        //针对rdd2去进行一个groupByKey

res39: Array[(String, Iterable[Int])] = Array((b,CompactBuffer(1)), (a,CompactBuffer(1)), (c,CompactBuffer(1, 1)))

可以看到:键的部分还是键,但是值的部分是变成了一个Iterable,是把这里面键相同的一些元素的值全部放在了一起。例如,其中的c对应的值是两个1嘛。


接下来,我们相对它当前聚合好的这个值,去进行一个求和的话,可以利用前面我们学习过的mapValues去进行:

scala> rdd2.groupByKey().mapValues(x=>x.sum).collect

res40: Array[(String, Int)] = Array((b,1), (a,1), (c,2))


但是,这里可能大家会觉得很麻烦,我们已经分完组了,我想要对分组后的那部分值进行计算还要再调用mapValues,这里能不能一步到位呢?来看下一个方法——

6. reduceByKey:它跟前一个groupByKey很像,区别在于groupByKey是直接按照键去进行归类,之后再借助其他方法的辅助再去对聚合后的值部分进行计算。但reduceByKey,看语法,它还需要我们提供一个函数func,它就是用来进行聚合的函数,意思就是你分完组之后,针对这个分组后的值进行这样一个func的处理。

这里注意,我们给进去的func函数,必须是(V,V)=>V类型,也就是两个形参,为啥是两个呢?就是表示说同一组里的的若干个值。

scala> rdd2.reduceByKey((x,y)=>x+y).collect

res41: Array[(String, Int)] = Array((b,1), (a,1), (c,2))

观察结果,会发现它跟我们groupByKey后通过mapValues求和得到的结果是一样的。

那么它是怎么去计算的呢?

一样的,先是按键把值进行聚合在一起,然后再根据传入的函数对同组的值进行遍历处理。

例如RDD数据里有(c , 1) , (c , 1) , (c , 2) , (c , 3) , (c , 4)

x=1 , y=1,先计算返回 (c , 2)

再取x=2 , y=2,先计算返回 (c , 4)

再取x=4 , y=3,先计算返回 (c , 7)

再取x=7 , y=4,先计算返回 (c , 11)


课堂练习:统计(1,3,4,7,2)中奇数和偶数的个数

scala> val rdd =sc.parallelize(List(1,3,4,7,2))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at parallelize at <console>:26


scala> rdd.map(x=> if(x%2==0) ("偶数",1) else ("奇数",1)).reduceByKey(_+_).collect

res46: Array[(String, Int)] = Array((偶数,2), (奇数,3))







7. join:用来进行连接的算子,把两个键值对RDD看成是两张表,按照键的位置去进行一个整合,就跟我们前面的groupByKey有些类似,但这里是针对两个键值对RDD进行分组的。

并且除了join之外,还有其他一些连接方法去使用,例如leftOuterJoin(左外连接), rightOuterJoin(右外连接), and fullOuterJoin(全外连接),这些连接方法名称不同,用法相同。

scala> val rdd1=sc.parallelize(List(("K1","V1"),("K2","V2"),("K3","V3")))

rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[124] at parallelize at <console>:23


scala> val rdd2=sc.parallelize(List(("K1","W1"),("K2","W2")))

rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[125] at parallelize at <console>:23


scala> rdd1.join(rdd2).collect

res42: Array[(String, (String, String))] = Array((K1,(V1,W1)), (K2,(V2,W2)))


scala> rdd2.join(rdd1).collect

res43: Array[(String, (String, String))] = Array((K1,(W1,V1)), (K2,(W2,V2)))


下面来看rdd中常用的Action类型算子

8. lookup(Key:K)——应用于键值对RDD,针对键值对RDD,按照给定的Key去寻找对应的值

scala> rdd.collect

res14: Array[(String, Int)] = Array((a,1), (b,1), (c,1), (c,1))


scala> rdd.lookup("c")    //把多个键值对中对应键“c”的值都给找出来

res15: Seq[Int] = WrappedArray(1, 1)


9. collect 和collectAsMap:后者只能运用于pairRDD,前者都可以更通用,它们的主要区别在于返回的类型不一样,使用collect时,会将所有的数据内容作为一个Array返回,但假如使用collectAsMap,这里会返回一个map类型的数据

scala> rdd.collect

res14: Array[(String, Int)] = Array((a,1), (b,1), (c,1), (c,1))


scala> rdd.collectAsMap

res16: scala.collection.Map[String,Int] = Map(b -> 1, a -> 1, c -> 1)


10. take(n):以Array形式返回RDD的前n条记录,跟collect的返回效果差不多,区别在于返回的数据量由我们自己定义。


11. count:计算RDD中的所有元素个数

scala> rdd.count

res18: Long = 4


12. reduce


小练习:

练习1:

scala> val rdd =sc.parallelize(List(1,3,4,7,2))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at parallelize at <console>:26


scala> rdd.map{case x=> if(x%2==0) ("偶数",1) else ("奇数",1)}.reduceByKey(_+_).collect



任务4:

1. 输出每位学生的总成绩,要求将两个成绩表中学生ID相同的成绩相加。

scala> val score_kv=bigdata.union(math).union(english).map(_.split("\t")).map(x=>(x(0),x(2).toInt))

score_kv: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[97] at map at <console>:25


scala> val total_score=score_kv.reduceByKey(_+_).sortBy(x=>x._1)

total_score: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[71] at sortBy at <console>:23


scala> total_score.collect

res14: Array[(String, Int)] = Array((1001,285), (1002,275), (1003,290), (1004,199), (1005,184), (1006,174), (1007,190), (1008,187), (1009,173), (1010,164), (1011,170), (1012,175))


2. 输出每位学生的平均成绩,要求将两个成绩表中学生ID相同的成绩相加并计算出平均分。

scala> val score_kv_count=score_kv.mapValues(x=>(x,1))

score_kv_count: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at mapValues at <console>:23


scala>val average_score=score_kv_count.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>x._1/x._2).sortBy(_._1)

average_score: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[78] at sortBy at <console>:23


scala> average_score.collect

res15: Array[(String, Int)] = Array((1001,95), (1002,91), (1003,96), (1004,99), (1005,92), (1006,87), (1007,95), (1008,93), (1009,86), (1010,82), (1011,85), (1012,87))


3.合并每个学生的总成绩和平均成绩。

scala> val result=total_score.join(average_score)

scala> result.collect

res25: Array[(String, (Int, Int))] = Array((1005,(184,92)), (1006,(174,87)), (1001,(285,95)), (1010,(164,82)), (1007,(190,95)), (1008,(187,93)), (1002,(275,91)), (1011,(170,85)), (1012,(175,87)), (1009,(173,86)), (1003,(290,96)), (1004,(199,99)))


问:如果想将结果整理成——(1005,赵信,184,92)的形式,如何完成?

scala>student.map(_.split("\t")).map(x=>(x(0),x(1))).join(total_score).join(average_score).map(x=>(x._1,x._2._1._1,x._2._1._2,x._2._2)).collect

res33: Array[(String, String, Int, Int)] = Array((1005,赵信,184,92)......


4、 文件读取与存储

下面我们一起看一下RDD进行读写时可采用的一些方式——

在我们对RDD进行读写的时候最常用的是两种格式:

1. 文本文件格式:就是我们平时常见的那种,把每一行的数据作为一条记录去存放在文件中的格式,它是属于非结构化的一种格式。

2. 序列化文件格式(/ˈsiːkwəns/)是一种用于键值对数据存储的Hadoop文件格式,它是属于结构化数据的。


下面我们就来看一下对处理好的RDD数据进行存储的操作:

1. saveAsTextFile——存储为文本文件:

把练习4中的result保存为文本文件:

scala> result.saveAsTextFile("/user/root/result0")


注意:这里的文件路径必须是不存在的目录

保存后打开HDFS浏览文件,发现里面有很多个文件:

_SUCCESS:内容为空,仅仅记录当前这个任务的运行状态是否成功,还是说有其他的一些问题。

part-0001 ~part-000n:这些才是真正存放数据的结果文件,那为什么会有这么多的数据文件呢?这里数据文件的个数是跟当前这个RDD的分区数有关系。

scala> result.partitions.size

res9: Int = 6


如果我们重新去写一个rdd:

scala> data.partitions.size

res10: Int = 2


查看它的分区数只有2:

scala> data.saveAs

saveAsObjectFile saveAsTextFile


scala> data.saveAsTextFile("/user/root/result1")

保存后查看结果,就只有两个数据文件了。


以上!

其实我们在存储时,有些情况下,只是想在算完后,就直接把它保留在一份数据文件中,方便我之后再去调用它,这里有没有办法直接让它所有的内容都存放在同一个文件下呢?其实也是可以的——

repartition——对分区数进行调整

其实前面我们也讲了,在创建RDD时是可以直接调整分区数的。

但是对于现有的、已经计算处理完毕的RDD,我们如何调整分区数呢?

这里就可以用reparation

也可以用coalesce(/ˌkəʊəˈles/)

这两种方法其实都是一样,都是为了去调整当前RDD的分区数

区别在于coalesce可以通过shuffle参数去控制改变分区数时是否还要执行shuffle阶段。

而reparation是没有这样的参数去选择的。

所以通常情况下会采用coalesce,避免shuffle的过程,以提高整体的性能。

这里的原理是什么呢?

比如我有一个图书馆,里面有100个书架,上面摆放了各个类型的数。

这时我们要搬迁了,新图书馆里只有50个书架,那我们就需要把原有的数进行合并,每两个书架的书合并后放入一个新书架里面就行了。

但是假如,我们要搬到的是一个150个书架的新图书馆里,我们就得重新针对现有的100个书架进行整理,去重新分门别类,从每个书架中抽取一部分书,放到新增的书架中去。

那么这个重新整理、分门别类的过程就相当于我们说的shuffle过程。

ok,那么就是说减少分区数呢,我们就用coalesce,如果是要增加分区数呢,我们就用repartition

正常情况下来讲其实两个都可以用

因为这里面,无论是减少还是增加分区数,shuffle过程都是可以执行的

但是coalesce的shuffle参数为false时,就只能用于减少分区数。


scala> result.coalesce(1,false).saveAsTextFile("/user/root/result2")

scala> result.repartition(1).saveAsTextFile("/user/root/result3")


假如我们要把RDD存储为SequenceFile的话使用——

2. saveAsSequenceFile(path)——这里保存的文件格式就是hadoop的SequenceFile文件格式,它只支持存储键值对RDD。

保存下来之后,我们可以看到数据文件的内容,被序列化了,无法直接看到它的内容。并且它在前边这里存下了这样一些键值对的数据类型,这里特别需要注意:因为后续我们从这些数据文件中恢复读取RDD的时候,是需要利用这个指定的键和值的类型去进行读取的。

scala> val rdd=sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[54] at parallelize at <console>:23


scala> rdd.saveAsSequenceFile("/user/data/sqf_result0")


3. 读取序列化文件

scala> import org.apache.hadoop.io.{Text,IntWritable}

import org.apache.hadoop.io.{Text, IntWritable} ^


scala>sc.sequenceFile("/user/data/sqf_result0",classOf[Text],classOf[IntWritable]).map{case (x,y)=>(x.toString,y.get())}.collect

res20: Array[(String, Int)] = Array((Panda,3), (Kay,6), (Snail,2))


任务6:汇总学生成绩并以文本格式存储在HDFS上,数据汇总为学生ID,姓名,总分,平均分。

整理数据:

cala> val student_kv=student.map(_.split("\t")).map(x=>(x(0),x(1)))

student_kv: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[59] at map at <console>:26

scala> val result=student_kv.join(total_score).join(average_score)
result: org.apache.spark.rdd.RDD[(String, ((String, Int), Int))] = MapPartitionsRDD[65] at join at <console>:28

scala> result.take(1)

res22: Array[(String, ((String, Int), Int))] = Array((1005,((赵信,184),92)))

scala> val save_data=result.mapValues{case (x,y)=>(x._1,x._2,y)}.map{case (x,y)=>(x,y._1,y._2,y._3)}
save_data: org.apache.spark.rdd.RDD[(String, String, Int, Int)] = MapPartitionsRDD[69] at map at <console>:26

scala> save_data.take(1)
res27: Array[(String, String, Int, Int)] = Array((1005,赵信,184,92))scala> save_data.saveAsTextFile("/user/data/student_grade")

发现分区文件太多,所以把导出目录删除后,重新调整分区再输出:

scala> save_data.coalesce(1,false).saveAsTextFile("/user/data/student_grade")



综合练习:基于3个基站的日志数据,要求计算某个手机号码在一天之内出现时间最多的两个地点。


这个练习使用的是ABC三个基站某一天当中记录下来的日志数据,这三个基站日志数据分别被保存成了3个TXT文件,这里的存储格式也都是一样的

每一份日志当中存储的的数据字段都是相同的:手机号、时间戳(格林威治时间自1970年1月1日(00:00:00 GMT)至当前时间的总秒数),基站ID,本次连接状态

这里的连接状态意思是:比如我们出门逛街,人是不断移动的,人移动的过程当中,手机要保持通讯,就必须要通过基站传递信号,在我们进入某个基站的覆盖范围时呢,我们就断开了原来的基站连接上现在所在的基站。

这个连接状态数据就记录了用户的手机和基站所对应的连接1和断开0记录。


那怎么计算呢?

假如我们要去计算用户停留时间最长的两个地点的话,所谓的停留时间如何去计算呢?那其实就是去计算在某一个基站连接跟断开这两部分之间的时间差。所以这里只需要对用户按照基站进行分组,算出停留时间,再根据停留时间进行排序,取出最大的这两个基站就可以了。


scala> val sta=sc.textFile("/user/data/station/StationA.txt").map(_.split(",")).map(x=>(x(0),x(1).toLong,x(2),x(3)))

sta: org.apache.spark.rdd.RDD[(String, Long, String, String)] = MapPartitionsRDD[86] at map at <console>:26


scala> val stb=sc.textFile("/user/data/station/StationB.txt").map(_.split(",")).map(x=>(x(0),x(1).toLong,x(2),x(3)))

stb: org.apache.spark.rdd.RDD[(String, Long, String, String)] = MapPartitionsRDD[90] at map at <console>:26


scala> val stc=sc.textFile("/user/data/station/StationC.txt").map(_.split(",")).map(x=>(x(0),x(1).toLong,x(2),x(3)))

stc: org.apache.spark.rdd.RDD[(String, Long, String, String)] = MapPartitionsRDD[94] at map at <console>:26


scala> val station=sta.union(stb).union(stc)

station: org.apache.spark.rdd.RDD[(String, Long, String, String)] = UnionRDD[96] at union at <console>:28


scala> station.count    //拼接成功

res32: Long = 20


假如我进入某个基站覆盖范围,我的手机会先跟它基站连接上记录状态1,等我离开这个地方的时候,手机会跟基站断开,记录状态0。

这里手机跟某个基站的断开记录0一定是大于它连接上的那个1时间戳的,所以这里我们就可以先根据数据中的连接状态值针对我们的时间戳进行处理,假如说他是连接状态1,我们就把它改为负值,假如是断开状态0,我们就不动。

做完这步处理,状态字段我们就可以舍弃了。因为即使没有状态字段,我们也可以根据基站分组,分组完毕后,再根据组内的时间戳去进行相加了。因为这里连接的部分已经别转换为负值了,这样他跟组内的其他断开的时间戳(保持正值)相加,就求出了从连接到断开的时间差

scala> val data=station.map{x=>if(x._4=="1") ((x._1,x._3),-x._2) else ((x._1,x._3),x._2)}.reduceByKey(_+_)
data: org.apache.spark.rdd.RDD[((String, String), Long)] = ShuffledRDD[115] at reduceByKey at <console>:26

scala> data.take(1)
res53: Array[((String, String), Long)] = Array(((18688888888,CC0710CC94ECC657A8561DE549D940E0),1300))


scala> val data2=data.map(x=>(x._1._1,(x._1._2,x._2))).groupByKey.mapValues{x=>val line=x.toList.sortBy(y=>y._2).reverse.take(2).map(z=>z._1);line}
data2: org.apache.spark.rdd.RDD[(String, List[String])] = MapPartitionsRDD[124] at mapValues at <console>:26

scala> data2.collect
res57: Array[(String, List[String])] = Array((18688888888,List(16030401EAFB68F1E3CDF819735E1C66, 9F36407EAD0629FC166F14DDE7970F68)), (18611132889,List(16030401EAFB68F1E3CDF819735E1C66, 9F36407EAD0629FC166F14DDE7970F68)))



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。