2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
【摘要】
目录
SparkStreaming实战案例三 状态恢复-扩展
需求
代码实现
SparkStreaming实战案例三 状态恢复-扩展
需求
在上面的基础之上
实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加
如:
先发送spark,得到spark,1
再发送spa...
目录
SparkStreaming实战案例三 状态恢复-扩展
需求
在上面的基础之上
实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加
如:
先发送spark,得到spark,1
再发送spark,得到spark,2
再停止程序,然后重新启动
再发送spark,得到spark,3
代码实现
package cn.it.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用SparkStreaming接收Socket数据,node01:9999
* 实现SparkStreaming程序停止之后再启动时还能够接着上次的结果进行累加
* 如:
* 先发送spark,得到spark,1
* 再发送spark,得到spark,2
* 再停止程序,然后重新启动
* 再发送spark,得到spark,3
*/
object SparkStreamingDemo03_StateRecovery {
val ckpdir = "./ckp"
def createStreamingContextFunction:StreamingContext={
//1.创建环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
ssc.checkpoint(ckpdir)
//2.接收socket数据
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
//3.做WordCount
val updateFunc= (currentValues:Seq[Int],historyValue:Option[Int])=>{
//将当前批次的数据和历史数据进行合并作为这一次新的结果!
val newValue: Int = currentValues.sum + historyValue.getOrElse(0)//getOrElse(默认值)
Option(newValue)
}
val resultDS: DStream[(String, Int)] = linesDS
.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey(updateFunc)
//4.输出
resultDS.print()
ssc
}
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getOrCreate(ckpdir,createStreamingContextFunction _)
val sc: SparkContext = ssc.sparkContext
sc.setLogLevel("WARN")
//5.启动并等待程序停止
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/115985662
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)