《Spark Streaming实时流式大数据处理实战》 ——3.3 RDD操作

举报
华章计算机 发表于 2020/02/22 15:17:13 2020/02/22
【摘要】 本节书摘来自华章计算机《Spark Streaming实时流式大数据处理实战》 —— 书中第3章,第3.3.1节,作者是肖力涛 。

3.3  RDD操作

  有一定开发经验的读者应该都使用过多线程,利用多核CPU的并行能力来加快运算速率。在开发并行程序时,我们可以利用类似Fork/Join的框架将一个大的任务切分成细小的任务,每个小任务模块之间是相互独立的,可以并行执行,然后将所有小任务的结果汇总起来,得到最终的结果。

  一个非常好的例子便是归并排序。对整个序列进行排序时,可以将序列切分成多个子序列进行排序,然后将排好序的子序列归并起来得到最终的结果,如图3.1所示。

 image.png

图3.1  归并排序Fork/Join

  对Hadoop有所了解的读者都知道map、reduce操作。对于大量的数据,我们可以通过map操作让不同的集群节点并行计算,之后通过reduce操作将结果整合起来得到最终输出。

  而对于Spark处理的大量数据而言,会将数据切分后放入RDD作为Spark的基本数据结构,开发者可以在RDD上进行丰富的操作,之后Spark会根据操作调度集群资源进行计算。总结起来,RDD的操作主要可以分为Transformation和Action两种。

3.3.1  Transformation操作

  在Spark中,Transformation操作表示将一个RDD通过一系列操作变为另一个RDD的过程,这个操作可能是简单的加减操作,也可能是某个函数或某一系列函数。值得注意的是,Transformation操作并不会触发真正的计算,只会建立RDD间的关系图。

* map(f:T=>U) : RDD[T]=>RDD[U],表示将RDD经由某一函数f后,转变为另一个RDD。

* filter(f:T=>Bool) : RDD[T]=>RDD[T],表示将RDD经由某一函数f后,只保留f返回为true的数据,组成新的RDD。

* flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U]),表示将RDD经由某一函数f后,转变为一个新的RDD,但是与map不同,RDD中的每一个元素会被映射成新的0到多个元素(f函数返回的是一个序列Seq)。

* mapPartitions(f:Iterator[T]=>Iterator[U], preservesPartitioning:Boolean = false) : RDD[U],表示将一个RDD经由某一函数f后,转变为另一个RDD,但是在操作RDD内部的元素时,是按照分区进行操作的。如果我们需要在映射过程中创建对象,那么可以利用mapPartitions操作,从而仅仅每个分区创建一次(在后面对数据库操作时能进一步感受到)。

* mapPartitionsWithIndex(f:(Int, Iterator[T])=>Iterator[U], preservesPartitioning: Boolean = false) : RDD[U],与mapPartitions类似,不过该操作提供了一个额外的整型参数,用于指定partition的索引值。

* sample(withReplacement: Boolean, fraction: Double, seed: Long) :RDD[T]=>RDD[T],该函数是一个统计机器学习常用到的抽样函数,将一个RDD内的元素通过用户设定的百分比、随机种子、有放回抽样等进行采样,获取元素组成一个新的RDD。其中,参数withReplacement表示是否放回抽样。

  如图3.2所示,RDD内部每个方框是一个分区。假设需要采样50%的数据,通过sample函数,从V1、V2、U1、U2、U3、U4采样出数据V1、U1和U4,形成新的RDD。

 image.png

图3.2  RDD sample操作

* groupByKey([numTasks]) : RDD[(K,V)]=>RDD[(K,Seq[V])],该函数针对包含键值对数据的RDD进行操作,会将RDD内部的数据,根据键值key,经由某一函数合并值value,从而新生成一个RDD,包含(K,Seq[V])对的数据集。

?注意:(1)该操作可以改变numTasks来增大或建设任务数量,默认与分区个数一致。

(2)另外,可以多使用reduceByKey操作,其在求和、求平均时表现更好。

* reduceByKey(f:(V,V)=>V, [numTasks]) : RDD[(K, V)]=>RDD[(K, V)],也是针对包含键值对数据的RDD进行操作,根据内部的键值,将value合并起来,同样地,可以利用numTasks设置任务数。

* union(otherDataset) : (RDD[T],RDD[T])=>RDD[T],表示将两个不同的RDD合并成为一个新的RDD。

* intersection(otherDataset): (RDD[T],RDD[T])=>RDD[T],表示将两个RDD内部的数据集求交集,产生一个新的RDD。

* distinct([numTasks]),表示对原RDD中的数据集进行去重操作。

* join(otherDataset, [numTasks]) : (RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(V,W))],返回key值相同的所有匹配对,如图3.3所示。

 image.png

图3.3  RDD join操作

  将两个RDD内部键值相同的数据,合并成key,pair(value1, value2)的形式。

* cogroup() :(RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(Seq[V],Seq[W]))],也是针对包含键值对数据的RDD,先将两个RDD中键相同的元素,将其值分别聚合成为一个集合,之后利用集合的迭代器,以(Key, (Iterable[V], Iterable[w]))的形式返回形成新的RDD。

* cartesian(otherDataset) : (RDD[T],RDD[U])=>RDD[(T,U)],表示对于两个RDD内部的数据集,将数据集进行笛卡尔积后,产生新的(T,U)二元组,构成新的RDD。

* sortByKey([ascending], [numTasks]) :RDD[(K,V)]=>RDD[(K,V)],表示根据key值进行排序,如果ascending设置为true,则按照升序排序。

* repartition(numPartitions):对RDD中的所有数据进行shuffle操作,对原有RDD中的分区重新划分,减少或者增加分区,使不同节点间的运算更加平衡。该操作通常会通过网络传输来打乱(shuffle)所有数据。


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。