客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

举报
Lansonli 发表于 2022/02/24 00:38:01 2022/02/24
【摘要】 Spark操作Kudu dataFrame操作kudu 一、DataFrameApi读取kudu表中的数据 虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本身调用读/写API。要设置读取,我们需要为Kudu表指定选项,命名我们要读取的表以及为表提供服务的Kudu集群的Kudu主服务器列...

Spark操作Kudu dataFrame操作kudu

一、DataFrameApi读取kudu表中的数据

虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本身调用读/写API。要设置读取,我们需要为Kudu表指定选项,命名我们要读取的表以及为表提供服务的Kudu集群的Kudu主服务器列表。

  • 代码示例

  
  1. /**
  2. * 使用DataFrameApi读取kudu表中的数据
  3. * @param sparkSession
  4. * @param kuduMaster
  5. * @param tableName
  6. */
  7. def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {
  8. //定义map集合,封装kudu的master地址和要读取的表名
  9. val options = Map(
  10. "kudu.master" -> kuduMaster,
  11. "kudu.table" -> tableName
  12. )
  13. sparkSession.read.options(options).kudu.show()
  14. }

二、 DataFrameApi写数据到kudu表中

在通过DataFrame API编写时,目前只支持一种模式“append”。尚未实现的“覆盖”模式。

  • 代码示例

  
  1. /**
  2. * 6)DataFrameApi写数据到kudu表中
  3. */
  4. def dataFrame2Kudu(session: SparkSession, kuduContext: KuduContext): Unit ={
  5. val data = List(person(3, "canglaoshi", 14, 0), person(4, "xiaowang", 18, 1))
  6. import session.implicits._
  7. val dataFrame = data.toDF
  8. //目前,在kudu中,数据的写入只支持append追加
  9. dataFrame.write.mode("append").options(kuduOptions).kudu
  10. //查看结果
  11. //导包
  12. import org.apache.kudu.spark.kudu._
  13. //加载表的数据,导包调用kudu方法,转换为dataFrame,最后在使用show方法显示结果
  14. sparkSession.read.options(kuduOptions).kudu.show()
  15. }

三、​​​​​​​使用sparksql操作kudu表

可以选择使用Spark SQL直接使用INSERT语句写入Kudu表;与'append'类似,INSERT语句实际上将默认使用 UPSERT语义处理;

  • 代码示例

  
  1. /**
  2. * 使用sparksql操作kudu表
  3. * @param sparkSession
  4. * @param sc
  5. * @param kuduMaster
  6. * @param tableName
  7. */
  8. def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {
  9. //定义map集合,封装kudu的master地址和表名
  10. val options = Map(
  11. "kudu.master" -> kuduMaster,
  12. "kudu.table" -> tableName
  13. )
  14. val data = List(persont(10, "小张", 30, 0), person(11, "小王", 40, 0))
  15. import sparkSession.implicits._
  16. val dataFrame: DataFrame = sc.parallelize(data).toDF
  17. //把dataFrame注册成一张表
  18. dataFrame.createTempView("temp1")
  19. //获取kudu表中的数据,然后注册成一张表
  20. sparkSession.read.options(options).kudu.createTempView("temp2")
  21. //使用sparkSQL的insert操作插入数据
  22. sparkSession.sql("insert into table temp2 select * from temp1")
  23. sparkSession.sql("select * from temp2 where age >30").show()
  24. }

 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/123030789

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。