Spark_常规性能调优(二)

举报
bigdata张凯翔 发表于 2021/03/26 01:51:12 2021/03/26
【摘要】 可以从数据结构的哪几个方面优化Spark?举例说明 1、优先使用数组以及字符串,而不是集合类。也就是说,优先使用array,而不是ArrayList、LinkedList、HashMap等集合。 比如:企业应用中的做法是,对于对于HashMap、List这种数据结构,统一用String拼接成特殊格式的字符串,比如Map<Integer,Person> persons = new...

可以从数据结构的哪几个方面优化Spark?举例说明

1、优先使用数组以及字符串,而不是集合类。也就是说,优先使用array,而不是ArrayList、LinkedList、HashMap等集合。
比如:企业应用中的做法是,对于对于HashMap、List这种数据结构,统一用String拼接成特殊格式的字符串,比如Map<Integer,Person> persons = new HashMap<Integer,Person>()。可以优化为,特殊的字符串格式:id:name,address| id:name,address….这样的数据格式。
2、避免使用多层嵌套的对象结构。
比如:public class Teacher{private List<Student> student = new ArrayList<Student>()} , 这就是非常不好的例子。因为Teacher类的内部又嵌套了大量的小Student对象,
优化:我们使用特殊的字符串来进行数据的存储。比如,用json字符串来存储数据,就是一个非常好的选择。
{“teacher”:1,”tracherName”:”leo”,students:[{“studentid”:1,”studentName”:”tome”}]}。
3、对于有些能够避免的场景,尽量使用int替代String,虽然String比ArrayList、HashMap等数据结构高效多,占用内存量少,但是,还有会有额外的信息消耗,就想对于id这样的字段,我们完全可以使用int代替。

如何使用序列化的持久化级别?

由于RDD的数据是持久化到内存,或者是磁盘中的,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如:MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等这种带ser的就代表序列化持久化级别,使用方式为:

RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可。
这样是将数据序列化然后在持久化,大大的减小对内存的消耗。

spark性能优化之提高并行度调优

1、(★★★★★重要)如何提高Spark的并行度?举例说明

答:当我们有两个Executor,每个Executor都有五个cpu core,
当我们设置参数new SparkConf().set(“spark.default.parallelism”,”5”)
这个参数的意思就是,把所有的RDD的partition都设置成5,也就是说明每个RDD的数据都会被分成五份。那么针对RDD的partition,一个partition就会启动一个线程task来进行计算。但是在集群中,我们分明分配了十个cpu core,但是只会使用五个task,使用五个cpu core,那么剩下的五个就会被浪费掉。

优化:我们完全可以设置相同的task,让十个cpu core都在运行,甚至设置20-30个,因为每个task运行结束的时候不一样,可能某个task很快就完成了,那么cpu又空闲了。
官网建议,设置集群总cpu数量的两倍~~三倍的并行度,这样的话,每个cpu core可能分配到并发运行 2 ~ 3 个task线程,那么这样就会连续、运转、最大的发挥他的功效来。

举例:当我们的spark-submit设置了executor的数量是10个,每个executor要求分配2个core,那么application总共会有20个core,此时我们可以设置:

new SparkConf().set(“spark.default.parallelism”,60)合理设置并行度。

spark性能优化之对多次使用的RDD进行持久化或者是checkpoint

spark性能优化之数据本地化

1、什么是数据本地化?有几种数据本地化级别?优化原理是什么?

答:数据本地化:指的是,数据离计算它的代码有多近,也就是我们要计算的数据和执行代码节点之间的距离。

数据本地化级别:

(1)、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。
(2)、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中。
(3)、NO_PREF:数据从哪里过来,性能都是一样的。
(4)、RACK_LOCAL:数据和计算它的代码在一个机架上。
(5)、ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。

优化原理:

1、 当我们task要处理partition的数据,在某一个Executor中,然后,有TaskSchedulerImpl首先会用最好的本地化级别去启动task,尽量的在那个包含了要处理的partition的executor中,去启动task ,就是PROCESS_LOCAL级别。
2、 但是,当那个包含了partition的executor中已经运行了足够的task 的时候,这个时候,这个计算的task默认的情况下,会等待一会,等待该executor什么时候空闲出来了一个cpu core ,然后这个task来启动。这个等待是有时间限制的(这个时间限制可以调优),当发现始终没有空闲的cpu core 的时候,那么就会放大一个级别去启动这个task。
3、 当然这个task去别的地方计算的时候,会调用RDD的itrator()方法,然后通过executor关联的blockManager,来尝试获取数据。BlocakManager底层,首先尝试从getLocal()在本地找数据,如果没有找到,那么就调用getRemote()方法,通过BlockTransferService,链接到有数据的BlockManager,来获取数据。

spark内存消耗主要体现在哪里?

1、每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自己还要大。

2、Java的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息。而且因为String使用的是UTF-16编码,所以每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节。【结论:一个字符相当于占用6个字节】

3、Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节。

4、 元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如Integer,来存储元素。

如何判断你的程序消耗了多少内存?

1、首先,自己设置RDD的并行度, 有两种方式,
(1)、在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task/partition的数量。
(2)、用SparkConf.set()方法,设置一个参数,spark.default.parallelize,可以统一设置这个application所有的RDD的partition数量。
2、其实,在程序中将RDD cache到内存中,调用RDD.cache()方法即可。
3、最后,观察Driver的log,就会发现类似于:“INFO BlockManagerMasterActor.Added rdd_0_1 in memory on mbk.local:50311(size:717.5 KB , free: 333.3 MB)”的日志信息,这就显示了每个partition占用了多少内存。
4、将这个内存信息乘以partition数量,即可得出RDD的内存占用量。

文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:www.jianshu.com/p/dff08d1f04cd

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。