2021年大数据Spark(十六):Spark Core的RDD算子练习

举报
Lansonli 发表于 2021/09/29 00:54:11 2021/09/29
【摘要】 目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集、并集、差集、笛卡尔积 distinct 算子 ​​​​​​​​​​​​​​first、take、top 算子 ​​​​​​​​​​​​​​keys、values 算子 ​​​​​​​mapValues 算子 ​​​...

目录

RDD算子练习

map 算子

filter 算子

flatMap 算子

交集、并集、差集、笛卡尔积

distinct 算子

​​​​​​​​​​​​​​first、take、top 算子

​​​​​​​​​​​​​​keys、values 算子

​​​​​​​mapValues 算子

​​​​​​​collectAsMap 算子

​​​​​​​mapPartitionsWithIndex 算子


RDD算子练习

    RDD中的函数有很多,不同业务需求使用不同函数进行数据处理分析,下面仅仅展示出比较常用的函数使用,更多函数在实际中使用体会,多加练习理解。

map 算子

对RDD中的每一个元素进行操作并返回操作的结果。


  
  1. //通过并行化生成rdd
  2. val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))  
  3. //对rdd1里的每一个元素
  4. rdd1.map(_ * 2).collect  //collect方法表示收集,是action操作
  5. //res4: Array[Int] = Array(10, 12, 8, 14, 6, 16, 4, 18, 2, 20)

 

filter 算子

函数中返回True的被留下,返回False的被过滤掉。


  
  1. val rdd2 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
  2. val rdd3 = rdd2.filter(_ >= 10) //大于等于10的留下
  3. rdd3.collect //10

 

flatMap 算子

对RDD中的每一个元素进行先map再压扁,最后返回操作的结果。

对RDD中的每一个元素进行先map再压扁,最后返回操作的结果


  
  1. val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
  2. //将rdd1里面的每一个元素先切分再压平
  3. val rdd2 = rdd1.flatMap(_.split(' '))//_是每一个元素,如其中一个:"a b c"   
  4. rdd2.collect
  5. //Array[String] = Array(a, b, c, d, e, f, h, i, j)

 

​​​​​​​交集、并集、差集、笛卡尔积

类似Scala集合类Set中相关函数,注意类型要一致。

注意类型要一致


  
  1. val rdd1 = sc.parallelize(List(5, 6, 4, 3))
  2. val rdd2 = sc.parallelize(List(1, 2, 3, 4))
  3. //union并集不会去重
  4. val rdd3 = rdd1.union(rdd2) 
  5. rdd3.collect//Array[Int] = Array(5, 6, 4, 3, 1, 2, 3, 4)
  6. //去重
  7. rdd3.distinct.collect
  8. //求交集
  9. val rdd4 = rdd1.intersection(rdd2)
  10. rdd4.collect
  11. //求差集
  12. val rdd5 = rdd1.subtract(rdd2)
  13. rdd5.collect
  14. //笛卡尔积
  15. val rdd1 = sc.parallelize(List("jack", "tom"))//学生
  16. val rdd2 = sc.parallelize(List("java", "python", "scala"))//课程
  17. val rdd3 = rdd1.cartesian(rdd2)
  18. //可以表示所有学生的所有可能的选课情况
  19. rdd3.collect//Array((jack,java), (jack,python), (jack,scala), (tom,java), (tom,python), (tom,scala))

 

​​​​​​​distinct 算子

对RDD中元素进行去重,与Scala集合中distinct类似。


  
  1. val rdd = sc.parallelize(Array(1,2,3,4,5,5,6,7,8,1,2,3,4), 3)
  2. rdd.distinct.collect

 

​​​​​​​​​​​​​​first、take、top 算子

从RDD中获取某些元素,比如first为第一个元素,take为前N个元素,top为最大的N个元素。


  
  1. val rdd1 = sc.parallelize(List(3,6,1,2,4,5))
  2. rdd1.top(2)// 6 5
  3. //按照原来的顺序取前N个
  4. rdd1.take(2) //3 6
  5. //按照原来的顺序取前第一个
  6. rdd1.first

​​​​​​​​​​​​​​keys、values 算子

针对RDD中数据类型为KeyValue对时,获取所有key和value的值,类似Scala中Map集合。


  
  1. val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
  2. val rdd2 = rdd1.map(x => (x.length, x))
  3. rdd2.collect
  4. //Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))
  5. rdd2.keys.collect
  6. //Array[Int] = Array(3, 5, 4, 3, 7, 5)
  7. rdd2.values.collect
  8. //Array[String] = Array(dog, tiger, lion, cat, panther, eagle)   

​​​​​​​mapValues 算子


  
  1. mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后。
  2. mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后
  3. val rdd1 = sc.parallelize(List((1,10),(2,20),(3,30)))
  4. val rdd2 = rdd1.mapValues(_*2).collect //_表示每一个value ,key不变,将函数作用于value
  5. // Array[(Int, Int)] = Array((1,20), (2,40), (3,60))

​​​​​​​collectAsMap 算子

当RDD中数据类型为Key/Value对时,转换为Map集合。


  
  1. val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
  2. rdd.collectAsMap
  3. //scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)/Map((b ,2), (a , 1)) //Scala中Map底层就是多个二元组

​​​​​​​mapPartitionsWithIndex 算子

取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的。

功能:取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的


  
  1. val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
  2. //该函数的功能是将对应分区中的数据取出来,并且带上分区编号
  3. val func = (index: Int, iter: Iterator[Int]) => {
  4.   iter.map(x => "[partID:" +  index + ", val: " + x + "]")
  5. }
  6. rdd1.mapPartitionsWithIndex(func).collect
  7. //Array[String] = Array(
  8. //[partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3],
  9. //[partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6],
  10. //[partID:2, val: 7], [partID:2, val: 8], [partID:2, val: 9]

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/115682499

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。