Spark shuffle介绍:概述
在基于mapreduce思想的计算模型里,Shuffle是map和reduce的纽带。计算框架对大数据分而治之,对处理数据进行分块并行处理,当需要对分块数据做聚合处理时,多个分块的数据在map阶段转为k-v结构,然后按key分区,在reduce阶段对各自分区的数据进行计算归并。map和reduce中间对数据做分区并规整的过程,就是shuffle的过程。
在spark中,对shuffle也从RDD的角度进行了定义。spark core的作业就是rdd的一系列转换,从aRDD转为bRDD,RDD之间有宽依赖和窄依赖,对于存在宽依赖的2个RDD之间,就会存在shuffle。
窄依赖:父RDD的每个分区数据都只被RDD的一个分区使用
宽依赖:父RDD的每个分区数据都被子RDD的多个分区使用
在hadoop、spark这类批处理的计算框架中,考虑到容错,shuffle过程的数据都是序列化到了磁盘。map做shuffle write,reduce做shuffle read。这个过程涉及了CPU对数据序列化及各种内存拷贝、内存对数据做分区排序、磁盘对shuffle数据存储读取、以及网络远程fetch,是计算过程中的资源消耗大户,也是最大的瓶颈。
下面以wordcount为例子,概要描述下作业shuffle过程,目标为统计输入文件里每个单词的数量。
比如有2个文本文件,文件1内容为: abc def 文件2内容为:abc ghi。
期望结果为:abc 2;def 1;ghi 1(abc有2个,def有1个,ghi有1个)
整个数据流如下:
1、作业运行时首先启2个task,分别读取文件1和文件2,每个task将自己读到文件内容split为单词
2、每个task将读到的单词,转为key-value结构,即word-数量的结构
3、每个task将key-value数据按key做分区,假设目标分区为3个,3个单词abc、def、ghi分别分到3个分区,并各自shuffle write为对应的文件
4、启动3个reduce任务,每个reduce任务读取对应分区的数据
5、每个reduce任务对相同的单词合并,数量合计
6、每个reduce任务的结果合并到driver输出结果
为了并行处理数据,需要启动多个task进行文件的读取和split。但count是对全部的单词做count,每个task里只包含了某1个文件的单词,这种就需要shuffle,把相同的单词先分到同一个分区里,再分别对每个分区做count,即可得到最终结果。这个简单的例子帮到你理解为什么需要shuffle了么。
- 点赞
- 收藏
- 关注作者
评论(0)