RDD原理与基本操作 | Spark,从入门到精通

举报
Tracy 发表于 2019/10/15 15:01:14 2019/10/15
【摘要】 欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)什么是 RDD?传统的 MapReduce 虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是在迭代计算式的时候,要进行大量的磁盘 IO 操作,而 RDD 正是解决这一缺点的抽象方法。RDD(Resili...

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)

什么是 RDD?

传统的 MapReduce 虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是在迭代计算式的时候,要进行大量的磁盘 IO 操作,而 RDD 正是解决这一缺点的抽象方法。RDD(Resilient Distributed Datasets)即弹性分布式数据集,从名字说起:

弹性

当计算过程中内存不足时可刷写到磁盘等外存上,可与外存做灵活的数据交换;

RDD 使用了一种“血统”的容错机制,在结构更新和丢失后可随时根据血统进行数据模型的重建;

分布式

就是可以分布在多台机器上进行并行计算;

数据集

一组只读的、可分区的分布式数据集合,集合内包含了多个分区。分区依照特定规则将具有相同属性的数据记录放在一起,每个分区相当于一个数据集片段。

RDD 内部结构

上图是 RDD 的内部结构图,它是一个只读、有属性的数据集。它的属性用来描述当前数据集的状态,数据集由数据的分区(partition)组成,并由(block)映射成真实数据。RDD 的主要属性可以分为 3 类:与其他 RDD 的关系(parents、dependencies);数据(partitioner、checkpoint、storage level、iterator 等);RDD 自身属性(sparkcontext、sparkconf),接下来我们根据属性分类来深入介绍各个组件。

RDD 自身属性

从自身属性说起,SparkContext 是 Spark job 的入口,由 Driver 创建在 client 端,包括集群连接、RDD ID、累加器、广播变量等信息。SparkConf 是参数配置信息,包括:

  • Spark api,控制大部分的应用程序参数;

  • 环境变量,配置IP地址、端口等信息;

  • 日志配置,通过 log4j.properties 配置。

数据

RDD 内部的数据集合在逻辑上和物理上被划分成多个小子集合,这样的每一个子集合我们将其称为分区(Partitions),分区的个数会决定并行计算的粒度,而每一个分区数值的计算都是在一个单独的任务中进行的,因此并行任务的个数也是由 RDD分区的个数决定的。但事实上 RDD 只是数据集的抽象,分区内部并不会存储具体的数据。Partition 类内包含一个 index 成员,表示该分区在 RDD 内的编号,通过 RDD 编号+分区编号可以确定该分区对应的唯一块编号,再利用底层数据存储层提供的接口就能从存储介质(如:HDFS、Memory)中提取出分区对应的数据。

RDD 的分区方式主要包含两种:Hash Partitioner 和 Range Partitioner,这两种分区类型都是针对 Key-Value 类型的数据,如是非 Key-Value 类型则分区函数为 None。Hash 是以 Key 作为分区条件的散列分布,分区数据不连续,极端情况也可能散列到少数几个分区上导致数据不均等;Range 按 Key 的排序平衡分布,分区内数据连续,大小也相对均等。

Preferred Location 是一个列表,用于存储每个 Partition 的优先位置。对于每个 HDFS 文件来说,这个列表保存的是每个 Partition 所在的块的位置,也就是该文件的「划分点」。

Storage Level 是 RDD 持久化的存储级别,RDD 持久化可以调用两种方法:cache 和 persist:persist 方法可以自由的设置存储级别,默认是持久化到内存;cache 方法是将 RDD 持久化到内存,cache 的内部实际上是调用了persist 方法,由于没有开放存储级别的参数设置,所以是直接持久化到内存。


8859dee4bb5b49f5aa747e2bafd53063.png

上图所示是 Storage Level 各级别分布,那么如何选择一种最合适的持久化策略呢?默认情况下,性能最高的当然是 MEMORY_ONLY,但前提是你的内存必须足够大到可以绰绰有余地存放下整个 RDD 的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果 RDD 中数据比较多时(比如几十亿),直接用这种持久化级别,会导致 JVM 的 OOM 内存溢出异常。

如果使用 MEMORY_ONLY 级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER 级别。该级别会将 RDD 数据序列化后再保存在内存中,此时每个 partition 仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比 MEMORY_ONLY 多出来的性能开销主要就是序列化与反序列化的开销,但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。但可能发生 OOM 内存溢出的异常。

如果纯内存的级别都无法使用,那么建议使用 MEMORY_AND_DISK_SER 策略,而不是 MEMORY_AND_DISK 策略。因为既然到了这一步,就说明 RDD 的数据量很大,内存无法完全放下,序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用 DISK_ONLY 和后缀为_2 的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销。

**Checkpoint **是 Spark 提供的一种缓存机制,当需要计算依赖链非常长又想避免重新计算之前的 RDD 时,可以对 RDD 做 Checkpoint 处理,检查 RDD 是否被物化或计算,并将结果持久化到磁盘或 HDFS 内。Checkpoint 会把当前 RDD 保存到一个目录,要触发 action 操作的时候它才会执行。在 Checkpoint 应该先做持久化(persist 或者 cache)操作,否则就要重新计算一遍。若某个 RDD 成功执行 checkpoint,它前面的所有依赖链会被销毁。

与 Spark 提供的另一种缓存机制 cache 相比:cache 缓存数据由 executor 管理,若 executor 消失,它的数据将被清除,RDD 需要重新计算;而 checkpoint 将数据保存到磁盘或 HDFS 内,job 可以从 checkpoint 点继续计算。Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 这样的方法,相当于 cache 到磁盘上,这样可以使 RDD 第一次被计算得到时就存储到磁盘上,它们之间的区别在于:persist 虽然可以将 RDD 的 partition 持久化到磁盘,但一旦作业执行结束,被 cache 到磁盘上的 RDD 会被清空;而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的。

**Compute **函数实现方式就是向上递归「获取父 RDD 分区数据进行计算」,直到遇到检查点 RDD 获取有缓存的 RDD。

**Iterator **用来查找当前 RDD Partition 与父 RDD 中 Partition 的血缘关系,并通过 Storage Level 确定迭代位置,直到确定真实数据的位置。它的实现流程如下:

  • 若标记了有缓存,则取缓存,取不到则进行 computeOrReadCheckpoint(计算或读检查点)。完了再存入缓存,以备后续使用。

  • 若未标记有缓存,则直接进行 computeOrReadCheckpoint。

  • computeOrReadCheckpoint 这个过程也做两个判断:有做过 checkpoint 和没有做过 checkpoint,做过 checkpoint 则可以读取到检查点数据返回,没做过则调该 RDD 的实现类的 compute 函数计算。

血统关系

一个作业从开始到结束的计算过程中产生了多个 RDD,RDD 之间是彼此相互依赖的,我们把这种父子依赖的关系称之为「血统」。

RDD 只支持粗颗粒变换,即只记录单个块(分区)上执行的单个操作,然后创建某个 RDD 的变换序列(血统 lineage)存储下来。

*变换序列指每个 RDD 都包含了它是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。

因此 RDD 的容错机制又称「血统」容错。 要实现这种「血统」容错机制,最大的难题就是如何表达父 RDD 和子 RDD 之间的依赖关系。


746d1c20f425410f8fbb526500c600b7.png

上图所示父 RDD 的每个分区最多只能被子 RDD 的一个分区使用,称为窄依赖(narrow dependency);若父 RDD 的每个分区可以被子 RDD 的多个分区使用,称为宽依赖(wide dependency)。简单来讲,窄依赖就是父子RDD分区间「一对一」的关系,而宽依赖就是「一对多」关系。从失败恢复来看,窄依赖的失败恢复起来更高效,因为它只需找到父 RDD 的一个对应分区即可,而且可以在不同节点上并行计算做恢复;宽依赖牵涉到父 RDD 的多个分区,需要得到所有依赖的父 RDD 分区的 shuffle 结果,恢复起来相对复杂些。


a189a0db5038417f9adcb38a4631ebcf.png

根据 RDD 之间的宽窄依赖关系引申出 Stage 的概念,Stage 是由一组 RDD 组成的执行计划。如果 RDD 的衍生关系都是窄依赖,则可放在同一个 Stage 中运行,若 RDD 的依赖关系为宽依赖,则要划分到不同的 Stage。这样 Spark 在执行作业时,会按照 Stage 的划分, 生成一个最优、完整的执行计划。

RDD 的创建方式与分区机制

RDD 的创建方式

RDD 的创建方式有四种:

1.使用程序中的集合创建 RDD,RDD 的数据源是程序中的集合,通过 parallelize 或者 makeRDD 将集合转化为 RDD;

*例

val num = Array(1,2,3,4,5)val rdd = sc.parallelize(num)

2.使用本地文件或 HDFS 创建 RDD,RDD 的数据源是本地文件系统或 HDFS 的数据,使用 textFile 方法创建RDD。 *例

val rdd = sc.textFile(“hdfs://master:9000/rec/data”)

3.使用数据流创建 RDD,使用 Spark Streaming 的相关类,接收实时的输入数据流创建 RDD(数据流来源可以是 kafka、flume 等)。

*例

val ssc = new StreamingContext(conf, Seconds(1))val lines = ssc.socketTextStream(“localhost”, 9999)val words = lines.flatMap(_.split(“ ”))

4.使用其他方式创建 RDD,从其他数据库上创建 RDD,例如 Hbase、MySQL 等。

*例

val sqlContext = new SQLContext(sc)val url = "jdbc:mysql://ip:port/xxxx"val prop = new Properties()val df = sqlContext.read.jdbc(url, “play_time”, prop)

RDD 的分区机制

RDD 的分区机制有两个关键点:一个是关键参数,即 Spark 的默认并发数 spark.default.parallelism;另一个是关键原则,RDD 分区尽可能使得分区的个数等于集群核心数目。

当配置文件 spark-default.conf 中显式配置了 spark.default.parallelism,那么 spark.default.parallelism=配置的值,否则按照如下规则进行取值:

1.本地模式(不会启动 executor,由 SparkSubmit 进程生成指定数量的线程数来并发)

spark-shell spark.default.parallelism = 1 spark-shell --master local[N] spark.default.parallelism = N (使用 N 个核) spark-shell --master local spark.default.parallelism = 1

2.伪集群模式(x 为本机上启动的 executor 数,y 为每个 executor 使用的 core 数,z 为每个 executor 使用的内存)

spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y

3.Yarn、standalone 等模式

spark.default.parallelism = max(所有 executor 使用的 core 总数,2)

4.Mesos

spark.default.parallelism = 8

spark.context 会生成两个参数,由 spark.default.parallelism 推导出这两个参数的值:

sc.defaultParallelism     = spark.default.parallelism

sc.defaultMinPartitions  = min(spark.default.parallelism, 2)

当 sc.defaultParallelism 和 sc.defaultMinPartitions 确认后,就可以推算 RDD 的分区数了。

  • 以 parallelize 方法为例

val rdd = sc.parallelize(1 to 10)

如果使用 parallelize 方法时没指定分区数, RDD 的分区数 = sc.defaultParallelism

  • 以 textFile 方法为例

val rdd = sc.textFile(“path/file”)

分区机制分两种情况:

1.从本地文件生成的 RDD,如果没有指定分区数,则默认分区数规则为

rdd 的分区数 = max(本地 file 的分片数, sc.defaultMinPartitions)

2.从 HDFS 生成的 RDD,如果没有指定分区数,则默认分区数规则为:

rdd 的分区数 = max(hdfs 文件的 block 数目, sc.defaultMinPartitions)

RDD 的常用操作

RDD 支持两种类型的操作:转换(Transformation)和动作(Action),转换操作是从已经存在的数据集中创建一个新的数据集,而动作操作是在数据集上进行计算后返回结果到 Driver,既触发 SparkContext 提交 Job 作业。转换操作都具有 Lazy 特性,即 Spark 不会立刻进行实际的计算,只会记录执行的轨迹,只有触发行动操作的时候,它才会根据 DAG 图真正执行。

转换与动作具体包含的操作种类如下图所示:

1.转换操作2.动作操作

最后我们通过一段代码来看看它具体的操作:

这段代码是用来计算某个视频被男性或女性用户的播放次数,其中 rdd_attr 用来记录用户性别,rdd_src 是用户对某个视频进行播放的记录,这两个 RDD 会进行一个 join 操作,比如这是某个男性用户对某个视频进行了播放,进行 map 操作之后得到视频 id 和性别作为 key,根据这个 key 做 reduceByKey 的操作,最终得到一个视频被男性/女性用户总共播放了多少次的 RDD,然后使用 combineByKey 合并同一个视频 id 的多个结果,最后保存到 HDFS 上。


本文转载自异步社区。

文链接

https://www.epubit.com/articleDetails?id=N38bafe22-7d66-414d-908d-de6d8dbcf1de

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。