2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform

举报
Lansonli 发表于 2021/09/29 00:22:30 2021/09/29
【摘要】 目录 SparkStreaming实战案例五 TopN-transform 需求 注意: ​​​​​​​代码实现 SparkStreaming实战案例五 TopN-transform 需求 使用窗口计算模拟热搜排行榜: 每隔10s计算最近20s的热搜排行榜!   注意: DStream没有直接排序的...

目录

SparkStreaming实战案例五 TopN-transform

需求

注意:

​​​​​​​代码实现


SparkStreaming实战案例五 TopN-transform

需求

使用窗口计算模拟热搜排行榜:

每隔10s计算最近20s的热搜排行榜!

 

注意:

DStream没有直接排序的方法!所以应该调用transform方法对DStream底层的RDD进行操作,调用RDD的排序方法!

transform(函数),该函数会作用到DStream底层的RDD上!

 

​​​​​​​代码实现


  
  1. package cn.itcast.streaming
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. /**
  7.  * 使用SparkStreaming接收Socket数据,node01:9999
  8.  * 使用窗口计算模拟热搜排行榜:
  9.  * 每隔10s计算最近20s的热搜排行榜!
  10.  */
  11. object SparkStreamingDemo05_TopN {
  12.   def main(args: Array[String]): Unit = {
  13.     //1.创建环境
  14.     val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
  15.     val sc: SparkContext = new SparkContext(conf)
  16.     sc.setLogLevel("WARN")
  17.     val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
  18.     ssc.checkpoint("./ckp")
  19.     //2.接收socket数据
  20.     val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)
  21.     //3.做WordCount
  22.     val wordAndCountDS: DStream[(String, Int)] = linesDS
  23.       .flatMap(_.split(" "))
  24.       .map((_, 1))
  25.       //windowDuration:窗口长度:就算最近多久的数据,必须都是微批间隔的整数倍
  26.       //slideDuration :滑动间隔:就是每隔多久计算一次,,必须都是微批间隔的整数倍
  27.       //每隔10s(slideDuration :滑动间隔)计算最近20s(windowDuration:窗口长度)的热搜排行榜!
  28.       .reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Seconds(20),Seconds(10))
  29.     //排序取TopN
  30.     //注意:DStream没有直接排序的方法!所以应该调用DStream底层的RDD的排序方法!
  31.     //transform(函数),该函数会作用到DStream底层的RDD上!
  32.     val resultDS: DStream[(String, Int)] = wordAndCountDS.transform(rdd => {
  33.       val sortedRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
  34.       val top3: Array[(String, Int)] = sortedRDD.take(3) //取出当前RDD中排好序的前3个热搜词!
  35.       println("======top3--start======")
  36.       top3.foreach(println)
  37.       println("======top3--end======")
  38.       sortedRDD
  39.     })
  40.     //4.输出
  41.     resultDS.print()
  42.     //5.启动并等待程序停止
  43.     ssc.start()
  44.     ssc.awaitTermination()
  45.     ssc.stop(stopSparkContext = true, stopGracefully = true)
  46.   }
  47. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/115986020

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。