2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展

举报
Lansonli 发表于 2021/09/28 23:57:11 2021/09/28
1.8k+ 0 0
【摘要】 目录 SparkStreaming实战案例三 状态恢复-扩展 需求 代码实现 SparkStreaming实战案例三 状态恢复-扩展 需求 在上面的基础之上 实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加 如: 先发送spark,得到spark,1 再发送spa...

目录

SparkStreaming实战案例三 状态恢复-扩展

需求

代码实现


SparkStreaming实战案例三 状态恢复-扩展

需求

在上面的基础之上

实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加

如:

先发送spark,得到spark,1

再发送spark,得到spark,2

再停止程序,然后重新启动

再发送spark,得到spark,3

代码实现


      package cn.it.streaming
      import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import org.apache.spark.{SparkConf, SparkContext}
      /**
       * 使用SparkStreaming接收Socket数据,node01:9999
       * 实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加
       * 如:
       * 先发送spark,得到spark,1
       * 再发送spark,得到spark,2
       * 再停止程序,然后重新启动
       * 再发送spark,得到spark,3
       */
      object SparkStreamingDemo03_StateRecovery {
        val ckpdir = "./ckp"
        def createStreamingContextFunction:StreamingContext={
          //1.创建环境
          val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
          val sc: SparkContext = new SparkContext(conf)
          sc.setLogLevel("WARN")
          val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
          ssc.checkpoint(ckpdir)
          //2.接收socket数据
          val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
          //3.做WordCount
          val updateFunc= (currentValues:Seq[Int],historyValue:Option[Int])=>{
            //将当前批次的数据和历史数据进行合并作为这一次新的结果!
            val newValue: Int = currentValues.sum + historyValue.getOrElse(0)//getOrElse(默认值)
            Option(newValue)
          }
          val resultDS: DStream[(String, Int)] = linesDS
            .flatMap(_.split(" "))
            .map((_, 1))
            .updateStateByKey(updateFunc)
          //4.输出
          resultDS.print()
          ssc
        }
        def main(args: Array[String]): Unit = {
          val ssc: StreamingContext = StreamingContext.getOrCreate(ckpdir,createStreamingContextFunction _)
          val sc: SparkContext = ssc.sparkContext
          sc.setLogLevel("WARN")
          //5.启动并等待程序停止
          ssc.start()
          ssc.awaitTermination()
          ssc.stop(stopSparkContext = true, stopGracefully = true)
        }
      }
  
 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/115985662

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。