2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey

举报
Lansonli 发表于 2021/09/29 01:23:09 2021/09/29
【摘要】 目录 SparkStreaming实战案例二 UpdateStateByKey 需求 1.updateStateByKey 2.mapWithState 代码实现 SparkStreaming实战案例二 UpdateStateByKey 需求 对从Socket接收的数据做WordCount并要求能够和历史数据进行累加!...

目录

SparkStreaming实战案例二 UpdateStateByKey

需求

1.updateStateByKey

2.mapWithState

代码实现


SparkStreaming实战案例二 UpdateStateByKey

需求

对从Socket接收的数据做WordCount并要求能够和历史数据进行累加!

如:

先发了一个spark,得到spark,1

然后不管隔多久再发一个spark,得到spark,2

也就是说要对数据的历史状态进行维护!

 

 

注意:可以使用如下API对状态进行维护

1.updateStateByKey

统计全局的key的状态,但是就算没有数据输入,他也会在每一个批次的时候返回之前的key的状态。假设5s产生一个批次的数据,那么5s的时候就会更新一次的key的值,然后返回。

这样的缺点就是,如果数据量太大的话,而且我们需要checkpoint数据,这样会占用较大的存储。

如果要使用updateStateByKey,就需要设置一个checkpoint目录,开启checkpoint机制。因为key的state是在内存维护的,如果宕机,则重启之后之前维护的状态就没有了,所以要长期保存它的话需要启用checkpoint,以便恢复数据。

 

2.mapWithState

也是用于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,有一点增量的感觉。

这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。

 

代码实现


  
  1. package cn.itcast.streaming
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. /**
  7.  * 使用SparkStreaming接收Socket数据,node01:9999
  8.  * 对从Socket接收的数据做WordCount并要求能够和历史数据进行累加!
  9.  * 如:
  10.  * 先发了一个spark,得到spark,1
  11.  * 然后不管隔多久再发一个spark,得到spark,2
  12.  * 也就是说要对数据的历史状态进行维护!
  13.  */
  14. object SparkStreamingDemo02_UpdateStateByKey {
  15.   def main(args: Array[String]): Unit = {
  16.     //1.创建环境
  17.     val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
  18.     val sc: SparkContext = new SparkContext(conf)
  19.     sc.setLogLevel("WARN")
  20.     //batchDuration the time interval at which streaming data will be divided into batches
  21.     //流数据将被划分为批的时间间隔,就是每隔多久对流数据进行一次微批划分!
  22.     val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
  23.     // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()
  24.     //注意:因为涉及到历史数据/历史状态,也就是需要将历史数据/状态和当前数据进行合并,作为新的Value!
  25.     //那么新的Value要作为下一次的历史数据/历史状态,那么应该搞一个地方存起来!
  26.     //所以需要设置一个Checkpoint目录!
  27.     ssc.checkpoint("./ckp")
  28.     //2.接收socket数据
  29.     val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)
  30.     //3.做WordCount
  31.     //======================updateStateByKey=======================
  32.     //val 函数名称 :(参数类型)=>函数返回值类型 = (参数名称:参数类型)=>{函数体}
  33.     //参数1:Seq[Int]:当前批次的数据,如发送了2个spark,那么key为spark,参数1为:Seq[1,1]
  34.     //参数2:Option[Int]:上一次该key的历史值!注意:历史值可能有可能没有!如果没有默认值应该为0,如果有就取出来
  35.     //返回值:Option[Int]:当前批次的值+历史值!
  36.     //Option表示:可能有Some可能没有None
  37.     val updateFunc= (currentValues:Seq[Int],historyValue:Option[Int])=>{
  38.       //将当前批次的数据和历史数据进行合并作为这一次新的结果!
  39.       if (currentValues.size > 0) {
  40.         val newValue: Int = currentValues.sum + historyValue.getOrElse(0)//getOrElse(默认值)
  41.         Option(newValue)
  42.       }else{
  43.         historyValue
  44.       }
  45.     }
  46.     val resultDS: DStream[(String, Int)] = linesDS
  47.       .flatMap(_.split(" "))
  48.       .map((_, 1))
  49.       //.reduceByKey(_ + _)
  50.       // updateFunc: (Seq[V], Option[S]) => Option[S]
  51.       .updateStateByKey(updateFunc)
  52.     //======================mapWithState=======================
  53.     //Spark 1.6提供新的状态更新函数【mapWithState】,mapWithState函数也会统计全局的key的状态,
  54.     //但是如果没有数据输入,便不会返回之前的key的状态,只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。
  55.     val mappingFunc = (word: String, current: Option[Int], state: State[Int]) => {
  56.       val newCount = current.getOrElse(0) + state.getOption.getOrElse(0)
  57.       val output = (word, newCount)
  58.       state.update(newCount)
  59.       output
  60.     }
  61.     val resultDS2 = linesDS
  62.       .flatMap(_.split(" "))
  63.       .map((_, 1))
  64.       .mapWithState(StateSpec.function(mappingFunc))
  65.     //4.输出
  66.     resultDS.print()
  67.     resultDS2.print()
  68.     //5.启动并等待程序停止
  69.     // 对于流式应用来说,需要启动应用
  70.     ssc.start()
  71.     // 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
  72.     ssc.awaitTermination()
  73.     // 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
  74.     ssc.stop(stopSparkContext = true, stopGracefully = true)
  75.   }
  76. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/115985521

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。