Spark Streaming 快速入门系列(6) | DStream的几种保存方式

举报
不温卜火 发表于 2021/01/07 19:09:12 2021/01/07
【摘要】   大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客...

  大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/

  本片博文为大家带来的是DStream的几种保存方式。
1


2
关于这部分我们还可以通过查看官方文档实现
http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#transformations-on-dstreams

  输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。

  与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。

  • 下列为输出操作的方法与解释

3
注意

  1. 连接不能写在driver层面(序列化);
  2. 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
  3. 增加foreachPartition,在分区创建(获取)。

1. 保存到文本文件

  • 1. 源码
package com.buwenbuhuo.spark.streaming.day02.output

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 20:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object OutDemo1 {
  def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo1").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck1") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\\W+")) .map((_,1)) .reduceByKey(_+_) .saveAsTextFiles("world","log") ssc.start() ssc.awaitTermination()
  }
}
  
 
  • 2. 打开端口进行测试
nc -lk 9999

  
 

4

  • 3. 运行结果

5
6

2. 保存到Mysql (第一种写法)

  • 1. 源码
package com.buwenbuhuo.spark.streaming.day02.output

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 21:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object OutDemo2 {
  val props: Properties = new Properties()
  props.setProperty("user","root")
  props.setProperty("password","199712") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo2").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck3") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\\W+")) .map((_,1)) .reduceByKey(_+_) .foreachRDD(rdd =>{ // 把rdd转成df // 1. 先创建sparkSession val spark: SparkSession = SparkSession.builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ // 2. 转换 val df: DataFrame = rdd.toDF("word","count") // 3. 写 df.write.mode("append").jdbc("jdbc:mysql://hadoop002:3306/rdd","window0812",props) }) ssc.start() ssc.awaitTermination()
  }
}
  
 
  • 2. 运行与写入数据

7

  • 3. 查看结果

8

3. 保存到Mysql (第二种写法)

  • 1. 源码
package com.buwenbuhuo.spark.streaming.day02.output

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 22:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object OutDemo3 {
  val props: Properties = new Properties()
  props.setProperty("user","root")
  props.setProperty("password","199712") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo3").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck3") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\\W+")) .map((_,1)) .updateStateByKey((seq:Seq[Int],opt:Option[Int]) => Some(seq.sum + opt.getOrElse(0))) .foreachRDD(rdd =>{ // 把rdd转成df // 1. 先创建sparkSession val spark: SparkSession = SparkSession.builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ // 2. 转换 val df: DataFrame = rdd.toDF("word","count") // 3. 写 df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop002:3306/rdd","window0813",props) }) ssc.start() ssc.awaitTermination()
  }
}
  
 
  • 2. 运行

9

  • 3. 运行结果

10
  本次的分享就到这里了,


14

  好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
  如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
  码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!

15
16

文章来源: buwenbuhuo.blog.csdn.net,作者:不温卜火,版权归原作者所有,如需转载,请联系作者。

原文链接:buwenbuhuo.blog.csdn.net/article/details/107975540

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。