一零一二、Spark- RDD-DF-DS 相互转换

举报
托马斯-酷涛 发表于 2022/06/21 00:33:06 2022/06/21
【摘要】 输入文件   代码 package example.spark.sql import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.ap...

输入文件

 

代码


  
  1. package example.spark.sql
  2. import org.apache.log4j.{Level, Logger}
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
  6. object RDD_DF_DS {
  7. def main(args: Array[String]): Unit = {
  8. val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[6]").config("spark.sql.warehouse.dir", "E:/").getOrCreate()
  9. val sc: SparkContext = spark.sparkContext
  10. Logger.getLogger("org").setLevel(Level.ERROR)
  11. val lines: RDD[String] = sc.textFile("data/input/csv")
  12. // val personRDD: RDD[(Int,String,Int)] = lines.map(line => {
  13. val personRDD: RDD[Person] = lines.map(line => {
  14. val str: Array[String] = line.split(",")
  15. // (str(0).toInt, str(1), str(2).toInt)
  16. Person(str(0).toInt, str(1), str(2).toInt)
  17. })
  18. //转换1: RDD->DF
  19. import spark.implicits._
  20. val personDF: DataFrame = personRDD.toDF("id", "name", "age")
  21. //转换2:RDD->DS
  22. val personDS: Dataset[Person] = personRDD.toDS()
  23. //转换3:DF->RDD DF没有泛型
  24. val rdd: RDD[Row] = personDF.rdd
  25. //转换4:DS->RDD
  26. val rdd1: RDD[Person] = personDS.rdd
  27. //转换5:DF-->DS
  28. val ds: Dataset[Person] = personDF.as[Person]
  29. //转换6:DS-->DF
  30. val df: DataFrame = personDS.toDF()
  31. personDF.show()
  32. personDS.show()
  33. rdd.foreach(println)
  34. rdd1.foreach(println)
  35. //关闭资源
  36. spark.stop()
  37. }
  38. case class Person(id: Int, name: String, age: Int)
  39. }

结果打印


  
  1. +---+--------+---+
  2. | id| name|age|
  3. +---+--------+---+
  4. | 1|zhangsan| 20|
  5. | 2| lisi| 29|
  6. | 3| wangwu| 25|
  7. | 4| zhaoliu| 30|
  8. | 5| tianqi| 35|
  9. | 6| kobe| 40|
  10. +---+--------+---+
  11. +---+--------+---+
  12. | id| name|age|
  13. +---+--------+---+
  14. | 1|zhangsan| 20|
  15. | 2| lisi| 29|
  16. | 3| wangwu| 25|
  17. | 4| zhaoliu| 30|
  18. | 5| tianqi| 35|
  19. | 6| kobe| 40|
  20. +---+--------+---+
  21. [1,zhangsan,20]
  22. [2,lisi,29]
  23. [4,zhaoliu,30]
  24. [3,wangwu,25]
  25. [5,tianqi,35]
  26. [6,kobe,40]
  27. Person(1,zhangsan,20)
  28. Person(4,zhaoliu,30)
  29. Person(5,tianqi,35)
  30. Person(6,kobe,40)
  31. Person(2,lisi,29)
  32. Person(3,wangwu,25)

1)DF/DS转RDD
Val Rdd = DF/DS.rdd
2) DS/RDD转DF
import spark.implicits._
调用 toDF(就是把一行数据封装成row类型)
3)RDD转DS
将RDD的每一行封装成样例类,再调用toDS方法
4)DF转DS
根据row字段定义样例类,再调用asDS方法[样例类]

特别注意:在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。

文章来源: tuomasi.blog.csdn.net,作者:托马斯-酷涛,版权归原作者所有,如需转载,请联系作者。

原文链接:tuomasi.blog.csdn.net/article/details/125366529

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。