《Spark Streaming实时流式大数据处理实战》 ——3.3 RDD操作
3.3 RDD操作
有一定开发经验的读者应该都使用过多线程,利用多核CPU的并行能力来加快运算速率。在开发并行程序时,我们可以利用类似Fork/Join的框架将一个大的任务切分成细小的任务,每个小任务模块之间是相互独立的,可以并行执行,然后将所有小任务的结果汇总起来,得到最终的结果。
一个非常好的例子便是归并排序。对整个序列进行排序时,可以将序列切分成多个子序列进行排序,然后将排好序的子序列归并起来得到最终的结果,如图3.1所示。
图3.1 归并排序Fork/Join
对Hadoop有所了解的读者都知道map、reduce操作。对于大量的数据,我们可以通过map操作让不同的集群节点并行计算,之后通过reduce操作将结果整合起来得到最终输出。
而对于Spark处理的大量数据而言,会将数据切分后放入RDD作为Spark的基本数据结构,开发者可以在RDD上进行丰富的操作,之后Spark会根据操作调度集群资源进行计算。总结起来,RDD的操作主要可以分为Transformation和Action两种。
3.3.1 Transformation操作
在Spark中,Transformation操作表示将一个RDD通过一系列操作变为另一个RDD的过程,这个操作可能是简单的加减操作,也可能是某个函数或某一系列函数。值得注意的是,Transformation操作并不会触发真正的计算,只会建立RDD间的关系图。
* map(f:T=>U) : RDD[T]=>RDD[U],表示将RDD经由某一函数f后,转变为另一个RDD。
* filter(f:T=>Bool) : RDD[T]=>RDD[T],表示将RDD经由某一函数f后,只保留f返回为true的数据,组成新的RDD。
* flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U]),表示将RDD经由某一函数f后,转变为一个新的RDD,但是与map不同,RDD中的每一个元素会被映射成新的0到多个元素(f函数返回的是一个序列Seq)。
* mapPartitions(f:Iterator[T]=>Iterator[U], preservesPartitioning:Boolean = false) : RDD[U],表示将一个RDD经由某一函数f后,转变为另一个RDD,但是在操作RDD内部的元素时,是按照分区进行操作的。如果我们需要在映射过程中创建对象,那么可以利用mapPartitions操作,从而仅仅每个分区创建一次(在后面对数据库操作时能进一步感受到)。
* mapPartitionsWithIndex(f:(Int, Iterator[T])=>Iterator[U], preservesPartitioning: Boolean = false) : RDD[U],与mapPartitions类似,不过该操作提供了一个额外的整型参数,用于指定partition的索引值。
* sample(withReplacement: Boolean, fraction: Double, seed: Long) :RDD[T]=>RDD[T],该函数是一个统计机器学习常用到的抽样函数,将一个RDD内的元素通过用户设定的百分比、随机种子、有放回抽样等进行采样,获取元素组成一个新的RDD。其中,参数withReplacement表示是否放回抽样。
如图3.2所示,RDD内部每个方框是一个分区。假设需要采样50%的数据,通过sample函数,从V1、V2、U1、U2、U3、U4采样出数据V1、U1和U4,形成新的RDD。
图3.2 RDD sample操作
* groupByKey([numTasks]) : RDD[(K,V)]=>RDD[(K,Seq[V])],该函数针对包含键值对数据的RDD进行操作,会将RDD内部的数据,根据键值key,经由某一函数合并值value,从而新生成一个RDD,包含(K,Seq[V])对的数据集。
?注意:(1)该操作可以改变numTasks来增大或建设任务数量,默认与分区个数一致。
(2)另外,可以多使用reduceByKey操作,其在求和、求平均时表现更好。
* reduceByKey(f:(V,V)=>V, [numTasks]) : RDD[(K, V)]=>RDD[(K, V)],也是针对包含键值对数据的RDD进行操作,根据内部的键值,将value合并起来,同样地,可以利用numTasks设置任务数。
* union(otherDataset) : (RDD[T],RDD[T])=>RDD[T],表示将两个不同的RDD合并成为一个新的RDD。
* intersection(otherDataset): (RDD[T],RDD[T])=>RDD[T],表示将两个RDD内部的数据集求交集,产生一个新的RDD。
* distinct([numTasks]),表示对原RDD中的数据集进行去重操作。
* join(otherDataset, [numTasks]) : (RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(V,W))],返回key值相同的所有匹配对,如图3.3所示。
图3.3 RDD join操作
将两个RDD内部键值相同的数据,合并成key,pair(value1, value2)的形式。
* cogroup() :(RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(Seq[V],Seq[W]))],也是针对包含键值对数据的RDD,先将两个RDD中键相同的元素,将其值分别聚合成为一个集合,之后利用集合的迭代器,以(Key, (Iterable[V], Iterable[w]))的形式返回形成新的RDD。
* cartesian(otherDataset) : (RDD[T],RDD[U])=>RDD[(T,U)],表示对于两个RDD内部的数据集,将数据集进行笛卡尔积后,产生新的(T,U)二元组,构成新的RDD。
* sortByKey([ascending], [numTasks]) :RDD[(K,V)]=>RDD[(K,V)],表示根据key值进行排序,如果ascending设置为true,则按照升序排序。
* repartition(numPartitions):对RDD中的所有数据进行shuffle操作,对原有RDD中的分区重新划分,减少或者增加分区,使不同节点间的运算更加平衡。该操作通常会通过网络传输来打乱(shuffle)所有数据。
- 点赞
- 收藏
- 关注作者
评论(0)