Spark Streaming 进阶实战五个例子

举报
小米粒-biubiubiu 发表于 2020/12/02 23:24:54 2020/12/02
【摘要】 一、带状态的算子:UpdateStateByKey 实现 计算 过去一段时间到当前时间 单词 出现的 频次 object StatefulWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName(...

一、带状态的算子:UpdateStateByKey

实现 计算 过去一段时间到当前时间 单词 出现的 频次


  
  1. object StatefulWordCount {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
  4. val ssc = new StreamingContext(sparkConf, Seconds(5))
  5. //如果使用了 stateful 的算子,必须要设置 checkpoint,
  6. //因为老的值 必须要 存在 某个 目录下面,新的值 才能去更新老的值
  7. //在生产环境中,建议把checkpoint 设置到 hdfs 的某个文件夹中
  8. ssc.checkpoint(".")
  9. val lines = ssc.socketTextStream("localhost", 6789)
  10. val result = lines.flatMap(_.split(" ").map((_, 1)))
  11. val state = result.updateStateByKey[Int](updateFunction _)
  12. state.print()
  13. ssc.start()
  14. ssc.awaitTermination()
  15. }
  16. /**
  17. * 把当前的数据去更新已有的或者老的数据
  18. * @param currentValues 当前的
  19. * @param preValues 老的
  20. * @return
  21. */
  22. def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
  23. val current = currentValues.sum
  24. val pre = preValues.getOrElse(0)
  25. Some(current + pre)
  26. }
  27. }

 

 

二、实战:计算到目前为止累积出现的单词的个数写入到mysql中


  
  1. /**
  2. * 使用 spark streaming 完成 词频统计,并输出到 mysql 数据库
  3. * 创建 数据库
  4. *
  5. * 创建数据表
  6. * create table wordcount (
  7. * word varchar(50) default null,
  8. * wordcount int(10) default null
  9. * )
  10. */
  11. object ForeachRDDApp {
  12. def main(args: Array[String]): Unit = {
  13. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")
  14. val ssc = new StreamingContext(sparkConf, Seconds(5))
  15. val lines = ssc.socketTextStream("localhost", 6789)
  16. val result = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)
  17. //result.print()
  18. //将结果写入到mysql
  19. //1、错误的方式
  20. // result.foreachRDD(rdd =>{
  21. // val connection = createConnection()
  22. // rdd.foreach {
  23. // record =>
  24. // val sql = "insert into wordcount (word,wordcount)" +
  25. // "values ('"+record._1+"','"+record._2+"')"
  26. // connection.createStatement().execute(sql)
  27. // }
  28. // })
  29. //2、正确的方式
  30. result.foreachRDD(rdd => {
  31. rdd.foreachPartition(partitionOfRecords => {
  32. if (partitionOfRecords.size > 0) {
  33. val connection = createConnection()
  34. partitionOfRecords.foreach(pair => {
  35. val sql = "insert into wordcount (word,wordcount)" +
  36. "values ('" + pair._1 + "','" + pair._2 + "')"
  37. connection.createStatement().execute(sql)
  38. })
  39. connection.close()
  40. }
  41. })
  42. })
  43. //3、更好的方式,查阅官方文档,使用 连接池的方式
  44. //存在的问题,这样每次都会插入新的数据,同样的单词频次字段不会去累加更新
  45. //解决方案 :每次 insert 之前,判断一下,该单词是否已经存在数据库中,如果已经存在则update
  46. //或者 存放在 hbase /redis 中,调用相应的api ,直接 插入和更新。
  47. ssc.start()
  48. ssc.awaitTermination()
  49. }
  50. def createConnection() = {
  51. Class.forName("com.mysql.jdbc.Driver")
  52. DriverManager.getConnection("jdbc://mysql://localhost:3306/dzx_spark", "root", "1234")
  53. }
  54. }

 

三、基于window 的统计

window :定时的进行一个时间段内的数据处理

window length : 窗口的长度

sliding interval : 窗口的间隔

这2个参数和我们的batch size  成倍数关系。如果不是倍数关系运行直接报错

每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc ==>每隔 sliding interval  统计 window length 的值

pair.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))

 

 

 

 

四、黑名单过滤


  
  1. /**
  2. * 黑名单过滤
  3. *
  4. * 访问日志 ==> DStream
  5. *
  6. * 20180808,zs
  7. * 20180808,ls
  8. * 20180808,ww
  9. *
  10. * ==> (zs:20180808,zs) (ls:20180808,ls)(ww:20180808,ww)
  11. *
  12. * 黑名单列表 ==》 RDD
  13. * zs ls
  14. * ==>(zs:true) (ls:true)
  15. *
  16. *
  17. * leftjoin
  18. * (zs:[<20180808,zs>,<true>]) pass ...
  19. * (ls:[<20180808,ls>,<true>]) pass ...
  20. * (ww:[<20180808,ww>,<false>]) ==> tuple1 ok...
  21. *
  22. *
  23. */
  24. object BlackNameListApp {
  25. def main(args: Array[String]): Unit = {
  26. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")
  27. val ssc = new StreamingContext(sparkConf, Seconds(5))
  28. /**
  29. * 构建黑名单
  30. */
  31. val blacks = List("zs", "ls")
  32. val blacksRDD = ssc.sparkContext.parallelize(blacks)
  33. .map(x => (x, true))
  34. val lines = ssc.socketTextStream("localhost", 6789)
  35. val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
  36. rdd.leftOuterJoin(blacksRDD).filter(x => x._2._2.getOrElse(false) != true).map(x => x._2._1)
  37. })
  38. clicklog.print()
  39. ssc.start()
  40. ssc.awaitTermination()
  41. }
  42. }

 

五、 spark  streaming 整合 spark  sql  实战


  
  1. object SqlNetworkWordCount {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SqlNetworkWordCount")
  4. val ssc = new StreamingContext(sparkConf, Seconds(5))
  5. val lines = ssc.socketTextStream("192.168.42.85", 6789)
  6. val words = lines.flatMap(_.split(" "))
  7. // Convert RDDs of the words DStream to DataFrame and run SQL query
  8. words.foreachRDD { (rdd: RDD[String], time: Time) =>
  9. // Get the singleton instance of SparkSession
  10. val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
  11. import spark.implicits._
  12. // Convert RDD[String] to RDD[case class] to DataFrame
  13. val wordsDataFrame = rdd.map(w => Record(w)).toDF()
  14. // Creates a temporary view using the DataFrame
  15. wordsDataFrame.createOrReplaceTempView("words")
  16. // Do word count on table using SQL and print it
  17. val wordCountsDataFrame =
  18. spark.sql("select word, count(*) as total from words group by word")
  19. println(s"========= $time =========")
  20. wordCountsDataFrame.show()
  21. }
  22. ssc.start()
  23. ssc.awaitTermination()
  24. }
  25. }
  26. /** Case class for converting RDD to DataFrame */
  27. case class Record(word: String)
  28. /** Lazily instantiated singleton instance of SparkSession */
  29. object SparkSessionSingleton {
  30. @transient private var instance: SparkSession = _
  31. def getInstance(sparkConf: SparkConf): SparkSession = {
  32. if (instance == null) {
  33. instance = SparkSession
  34. .builder
  35. .config(sparkConf)
  36. .getOrCreate()
  37. }
  38. instance
  39. }
  40. }

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/85676609

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。