2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
【摘要】
目录
SparkStreaming实战案例三 状态恢复-扩展
需求
代码实现
SparkStreaming实战案例三 状态恢复-扩展
需求
在上面的基础之上
实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加
如:
先发送spark,得到spark,1
再发送spa...
目录
SparkStreaming实战案例三 状态恢复-扩展
需求
在上面的基础之上
实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加
如:
先发送spark,得到spark,1
再发送spark,得到spark,2
再停止程序,然后重新启动
再发送spark,得到spark,3
代码实现
-
package cn.it.streaming
-
-
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
-
import org.apache.spark.streaming.{Seconds, StreamingContext}
-
import org.apache.spark.{SparkConf, SparkContext}
-
-
/**
-
* 使用SparkStreaming接收Socket数据,node01:9999
-
* 实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加
-
* 如:
-
* 先发送spark,得到spark,1
-
* 再发送spark,得到spark,2
-
* 再停止程序,然后重新启动
-
* 再发送spark,得到spark,3
-
*/
-
object SparkStreamingDemo03_StateRecovery {
-
val ckpdir = "./ckp"
-
-
def createStreamingContextFunction:StreamingContext={
-
//1.创建环境
-
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
-
val sc: SparkContext = new SparkContext(conf)
-
sc.setLogLevel("WARN")
-
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
-
ssc.checkpoint(ckpdir)
-
-
//2.接收socket数据
-
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
-
-
//3.做WordCount
-
val updateFunc= (currentValues:Seq[Int],historyValue:Option[Int])=>{
-
//将当前批次的数据和历史数据进行合并作为这一次新的结果!
-
val newValue: Int = currentValues.sum + historyValue.getOrElse(0)//getOrElse(默认值)
-
Option(newValue)
-
}
-
-
val resultDS: DStream[(String, Int)] = linesDS
-
.flatMap(_.split(" "))
-
.map((_, 1))
-
.updateStateByKey(updateFunc)
-
-
//4.输出
-
resultDS.print()
-
-
ssc
-
}
-
-
def main(args: Array[String]): Unit = {
-
val ssc: StreamingContext = StreamingContext.getOrCreate(ckpdir,createStreamingContextFunction _)
-
val sc: SparkContext = ssc.sparkContext
-
sc.setLogLevel("WARN")
-
-
//5.启动并等待程序停止
-
ssc.start()
-
ssc.awaitTermination()
-
ssc.stop(stopSparkContext = true, stopGracefully = true)
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/115985662
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)