【SparkAPI JAVA版】JavaPairRDD——cache、persist、unpersist、getStorageL

举报
Copy工程师 发表于 2022/01/20 20:17:37 2022/01/20
【摘要】 说明这四个方法都和缓存有关,所以写在一块。 JavaPairRDD的cache方法讲解 官方文档说明Persist this RDD with the default storage level (MEMORY_ONLY). 中文含义cache就是在内存中缓存数据,其实也是使用的persist。使用非序列化的方式将RDD的数据全部尝试持久化到内存中,cache()只是一个transform...

说明

这四个方法都和缓存有关,所以写在一块。

JavaPairRDD的cache方法讲解

官方文档说明
Persist this RDD with the default storage level (MEMORY_ONLY).
中文含义

cache就是在内存中缓存数据,其实也是使用的persist。使用非序列化的方式将RDD的数据全部尝试持久化到内存中,cache()只是一个transformtion,是lazy的,必须通过一个action触发,才能真正的将该RDD cache到内存中。

方法原型
//scala
def cache(): JavaPairRDD[K, V]
//java
public JavaPairRDD<K,V> cache()

JavaPairRDD的persist方法讲解

官方文档说明
Set this RDD's storage level to persist its values across operations after the first time it is computed. Can only be called once on each RDD.
中文含义

将此RDD的存储级别设置为在第一次计算后跨操作持久化其值。每个RDD只能调用一次。

方法原型
//scala
def persist(newLevel: StorageLevel): JavaPairRDD[K, V]
//java
public JavaPairRDD<K,V> persist(StorageLevel newLevel)

JavaPairRDD的unpersist方法讲解

官方文档说明
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
blocking
中文含义

将RDD标记为非持久性的,并从内存和磁盘中删除它的所有块。

方法原型
//scala
def unpersist(): JavaPairRDD[K, V]
def persist(newLevel: StorageLevel): JavaPairRDD[K, V]
//java
public JavaPairRDD<K,V> unpersist()
public JavaPairRDD<K,V> persist(StorageLevel newLevel)

JavaPairRDD的getStorageLevel方法讲解

官方文档说明
Get the RDD's current storage level, or StorageLevel.NONE if none is set.
中文含义

获取RDD的当前存储级别,如果未设置,则获取StorageLevel.NONE。

方法原型
//scala
def getStorageLevel: StorageLevel
//java
public static StorageLevel getStorageLevel()
cache、persist、unpersist的说明

cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。
其中cache这个方法是一个Tranformation,当第一次遇到Action算子的时才会进行持久化。

cache的源码:

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()

persist源码:

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

persist(StorageLevel.MEMORY_ONLY)源码:

/**
 * Set this RDD's storage level to persist its values across operations after the first time
 * it is computed. This can only be used to assign a new storage level if the RDD does not
 * have a storage level set yet..
 */
def persist(newLevel: StorageLevel): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  sc.persistRDD(this)
  // Register the RDD with the ContextCleaner for automatic GC-based cleanup
  sc.cleaner.foreach(_.registerRDDForCleanup(this))
  storageLevel = newLevel
  this
}

从源码中可以看得出来cache内部调用了persist方法,persist方法又调用了persist(StorageLevel.MEMORY_ONLY)方法,所以执行cache算子其实就是执行了persist算子且持久化级别为MEMORY_ONLY

两者的区别:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。

StorageLevel 说明(参考博客):

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}

1.MEMORY_ONLY

使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。

2.MEMORY_AND_DISK

使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。

3.MEMORY_ONLY_SER

基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。

4.MEMORY_AND_DISK_SER

基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。

5.DISK_ONLY

使用未序列化的Java对象格式,将数据全部写入磁盘文件中。

6.MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等

对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

查看其构造函数

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}

可以看到StorageLevel类的主构造器包含了5个参数:
useDisk:使用硬盘(外存)
useMemory:使用内存
useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
replication:备份数(在多个节点上备份)
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
另外还注意到有一种特殊的缓存级别

 val OFF_HEAP = new StorageLevel(false, false, true, false)

使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。

if (useOffHeap) {
  require(!useDisk, "Off-heap storage level does not support using disk")
  require(!useMemory, "Off-heap storage level does not support using heap memory")
  require(!deserialized, "Off-heap storage level does not support deserialized storage")
  require(replication == 1, "Off-heap storage level does not support multiple replication")
}

建议:
默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

unpersist、getStorageLevel

unpersist表示取消缓存,删除掉缓存块。默认unpersist的blocking参数是true
getStorageLevel 获取的是缓存存储级别,例如: StorageLevel(memory, deserialized, 1 replicas)

cache注意事项

cache之后一定不能直接去接算子。因为cache后有算子的话,它每次都会重新触发这个计算过程,从而导致cache失效。
cache操作需要当第一个使用到它的job执行后才会生效,而不是cache后马上可用,这是spark框架的延迟计算导致的。可能粗想起来也不会有什么问题,但是不正确的使用unpersist操作,也可能会导致cache失效。如下例子所示,在action操作之前就把缓存释放掉:

val data = sc.textFile(“data.csv”).flatMap(_.split(,)).cache() 
val data1 = data.map(word => (word, 1)).reduceByKey(_ + _) 
val data2 = data.map(word => (word, word.length)).reduceByKey(_ + _) 
data.unpersist() 
val wordCount1 = data1.count() 
val wordCount2 = data2.count()

如何释放cache缓存:unpersist,它是立即执行的。persist是lazy级别的(没有计算),unpersist是eager级别的。RDD cache的生命周期是application级别的,也就是如果不显示unpersist释放缓存,RDD会一直存在(虽然当内存不够时按LRU算法进行清除),如果不正确地进行unpersist,让无用的RDD占用executor内存,会导致资源的浪费,影响任务的效率。

实例
public class Cache {

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");

        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD<String> javaRDD = sc.parallelize(Lists.newArrayList("a","b"));
        JavaRDD<String> javaRDD1 = sc.parallelize(Lists.newArrayList("1","2","3"));
        // cache 数据
        javaRDD.cache();
        System.out.println(javaRDD.count());
        System.out.println(javaRDD.getStorageLevel());


        // persist
        javaRDD1.persist(StorageLevel.MEMORY_AND_DISK_SER());
        System.out.println(javaRDD1.count());
        System.out.println(javaRDD1.getStorageLevel());

        // unpersist
        javaRDD.unpersist();
        javaRDD1.unpersist();

    }
}
结果

2
StorageLevel(memory, deserialized, 1 replicas)

3
StorageLevel(disk, memory, 1 replicas)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。