2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展

举报
Lansonli 发表于 2021/09/28 23:57:11 2021/09/28
【摘要】 目录 SparkStreaming实战案例三 状态恢复-扩展 需求 代码实现 SparkStreaming实战案例三 状态恢复-扩展 需求 在上面的基础之上 实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加 如: 先发送spark,得到spark,1 再发送spa...

目录

SparkStreaming实战案例三 状态恢复-扩展

需求

代码实现


SparkStreaming实战案例三 状态恢复-扩展

需求

在上面的基础之上

实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加

如:

先发送spark,得到spark,1

再发送spark,得到spark,2

再停止程序,然后重新启动

再发送spark,得到spark,3

代码实现


  
  1. package cn.it.streaming
  2. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. /**
  6.  * 使用SparkStreaming接收Socket数据,node01:9999
  7.  * 实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加
  8.  * 如:
  9.  * 先发送spark,得到spark,1
  10.  * 再发送spark,得到spark,2
  11.  * 再停止程序,然后重新启动
  12.  * 再发送spark,得到spark,3
  13.  */
  14. object SparkStreamingDemo03_StateRecovery {
  15.   val ckpdir = "./ckp"
  16.   def createStreamingContextFunction:StreamingContext={
  17.     //1.创建环境
  18.     val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
  19.     val sc: SparkContext = new SparkContext(conf)
  20.     sc.setLogLevel("WARN")
  21.     val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
  22.     ssc.checkpoint(ckpdir)
  23.     //2.接收socket数据
  24.     val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
  25.     //3.做WordCount
  26.     val updateFunc= (currentValues:Seq[Int],historyValue:Option[Int])=>{
  27.       //将当前批次的数据和历史数据进行合并作为这一次新的结果!
  28.       val newValue: Int = currentValues.sum + historyValue.getOrElse(0)//getOrElse(默认值)
  29.       Option(newValue)
  30.     }
  31.     val resultDS: DStream[(String, Int)] = linesDS
  32.       .flatMap(_.split(" "))
  33.       .map((_, 1))
  34.       .updateStateByKey(updateFunc)
  35.     //4.输出
  36.     resultDS.print()
  37.     ssc
  38.   }
  39.   def main(args: Array[String]): Unit = {
  40.     val ssc: StreamingContext = StreamingContext.getOrCreate(ckpdir,createStreamingContextFunction _)
  41.     val sc: SparkContext = ssc.sparkContext
  42.     sc.setLogLevel("WARN")
  43.     //5.启动并等待程序停止
  44.     ssc.start()
  45.     ssc.awaitTermination()
  46.     ssc.stop(stopSparkContext = true, stopGracefully = true)
  47.   }
  48. }

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/115985662

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。