Spark 编程模型(上)

举报
Smy1121 发表于 2019/06/22 14:53:30 2019/06/22
【摘要】 从Hadoop MR到Spark

从Hadoop MR到Spark

回顾hadoop—mapreduce计算过程

image.png


MR VS Spark

image.png


Spark编程模型

核心概念

image.png

注意:对比mr里的概念来学习


Spark Application的组成

image.png


Spark应用程序的组成

● Driver

● Executor


注意:对照helloworld来思考

Spark Application基本概念

image.png


Spark Application编程模型

Spark 应用程序编程模型

image.png


– Driver Program ( SparkContext )

– Executor ( RDD 操作)

● 输入Base-> RDD

● Transformation RDD->RDD

● Action RDD->driver or Base

● 缓存 Persist or cache()


– 共享变量

● broadcast variables(广播变量)

● accumulators(累加器)


回顾Spark Hello World

image.png


初识RDD

什么是RDD

定义:Resilient distributed datasets (RDD), an efficient, general-purpose and fault-tolerant abstraction for sharing data in cluster applications.


RDD 是只读的。

RDD 是分区记录的集合。

RDD 是容错的。--- lineage

RDD 是高效的。

RDD 不需要物化。---物化:进行实际的变换并最终写入稳定的存储器上

RDD 可以缓存的。---课指定缓存级别


RDD是spark的核心,也是整个spark的架构基础,RDD是弹性分布式集合(Resilient Distributed Datasets)的简称,是分布式只读且已分区集合对象。这些集合是弹性的,如果数据集一部分丢失,则可以对它们进行重建。


RDD接口

image.png


RDD的本质特征

image.png


RDD--partitions

Spark中将1~100的数组转换为rdd

image.png


通过第15行的size获得rdd的partition的个数,此处创建rdd显式指定定分区个数2,默认数值是这个程序所分配到的资源的cpu核的个数


RDD-preferredLocations

返回此RDD的一个partition的数据块信息,如果一个数据块(block)有多个备份在返回所有备份的location地址信息

主机ip或域名


作用:spark在进行任务调度室尽可能根据block的地址做到本地计算


RDD-dependencies

RDD之间的依赖关系分为两类:

● 窄依赖

每个父RDD的分区都至多被一个子RDD的分区使用,即为OneToOneDependecies;


● 宽依赖

多个子RDD的分区依赖一个父RDD的分区,即为ShuffleDependency 。例如,map操作是一种窄依赖,而join操作是一种宽依赖(除非父RDD已经基于Hash策略被划分过了,co-partitioned)


image.png


窄依赖相比宽依赖更高效资源消耗更少


允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。例如,可以逐个元素地依次执行filter操作和map操作。


相反,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完成。


在窄依赖中,节点失败后的恢复更加高效。


因为只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上重新计算。


与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的一些分区丢失,导致计算的重新执行。


RDD-compute

分区计算


Spark对RDD的计算是以partition为最小单位的,并且都是对迭代器进行复合,不需要保存每次的计算结果


RDD- partitioner

分区函数:目前spark中提供两种分区函数:

HashPatitioner(哈希分区)

RangePatitioner(区域分区)

且partitioner只存在于(K,V)类型的RDD中,rdd本身决定了分区的数量。


RDD- lineage

val lines = sc.textFile("hdfs://...")

// transformed RDDs

val errors = lines.filter(_.startsWith("ERROR"))

val messages = errors.map(_.split("\t")).map(r => r(1))

messages.cache()

// action 1

messages.filter(_.contains("mysql")).count()

// action 2

messages.filter(_.contains("php")).count()

image.png


image.png


RDD经过trans或action后产生一个新的RDD,RDD之间的通过lineage来表达依赖关系,lineage是rdd容错的重要机制,rdd转换后的分区可能在转换前分区的节点内存中

image.png


典型RDD的特征

image.png


不同角度看RDD

image.png

Scheduler Optimizations

spacer.gif


【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。