六十四、Spark-分别统计各个单词个数及特殊字符总个数

举报
托马斯-酷涛 发表于 2022/05/25 23:03:42 2022/05/25
【摘要】 共享变量 广播变量(Broadcast Variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本,简单理解:减少内存,减小计算压力; 累加器(Accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和); ...

共享变量

广播变量(Broadcast Variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本,简单理解:减少内存,减小计算压力;

累加器(Accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);

需求说明:以词频统计WordCount程序为例,处理特殊数据,包括非单词符号,做WordCount的同时统计出特殊字符的数量

目录

共享变量

原数据展示

业务逻辑

完整代码

程序运行

项目总结


原数据展示

         注:原数据杂乱无章,与所需单词混淆,间隔且不等

业务逻辑

1、创建本地环境,并设置日志提示级别


   
  1. val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
  2. val sc: SparkContext = new SparkContext(conf)
  3. sc.setLogLevel("WARN")

2、创建累加器

val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
  

3、定义特殊字符集合

val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")

  

4、将集合作为广播变量广播到各个节点

val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
  

5、加载数据,创建RDD

val lines: RDD[String] = sc.textFile("data/input/words2.txt")
  

6、过滤筛选


   
  1. val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
  2. .flatMap(_.split("\\s+"))
  3. .filter(ch => {
  4. //获取广播数据
  5. val list: List[String] = broadcast.value
  6. if (list.contains(ch)) { //特殊字符
  7. mycounter.add(1)
  8. false
  9. } else { //单词
  10. true
  11. }
  12. }).map((_, 1))
  13. .reduceByKey(_ + _)

7、输出单词统计及特殊字符


   
  1. wordcountResult.foreach(println)
  2. val chResult: lang.Long = mycounter.value
  3. println("特殊字符的数量:" + chResult)

完整代码


   
  1. package org.example.spark
  2. import java.lang
  3. import org.apache.commons.lang3.StringUtils
  4. import org.apache.spark.broadcast.Broadcast
  5. import org.apache.spark.rdd.RDD
  6. import org.apache.spark.util.LongAccumulator
  7. import org.apache.spark.{SparkConf, SparkContext}
  8. /**
  9. * Author tuomasi
  10. * Desc 演示RDD的共享变量
  11. */
  12. object RDD_ShareVariable {
  13. def main(args: Array[String]): Unit = {
  14. //TODO 0.env/创建环境
  15. val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
  16. val sc: SparkContext = new SparkContext(conf)
  17. sc.setLogLevel("WARN")
  18. //创建计数器/累加器
  19. val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
  20. //定义一个特殊字符集合
  21. val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")
  22. //将集合作为广播变量广播到各个节点
  23. val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
  24. //TODO 1.source/加载数据/创建RDD
  25. val lines: RDD[String] = sc.textFile("data/input/words2.txt")
  26. //TODO 2.transformation
  27. val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
  28. .flatMap(_.split("\\s+"))
  29. .filter(ch => {
  30. //获取广播数据
  31. val list: List[String] = broadcast.value
  32. if (list.contains(ch)) { //特殊字符
  33. mycounter.add(1)
  34. false
  35. } else { //单词
  36. true
  37. }
  38. }).map((_, 1))
  39. .reduceByKey(_ + _)
  40. //TODO 3.sink/输出
  41. wordcountResult.foreach(println)
  42. val chResult: lang.Long = mycounter.value
  43. println("特殊字符的数量:" + chResult)
  44. }
  45. }

程序运行

原数据:

控制台打印:

         注:通过对比,该程序实现了单词与特殊字符的分别统计

项目总结

使用广播变量的好处:

1、Driver每次分发任务的时候会把task和计算逻辑的变量发送给Executor。不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。这样会导致消耗大量的内存导致严重的后果。
2、使用广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少;

 

文章来源: tuomasi.blog.csdn.net,作者:托马斯-酷涛,版权归原作者所有,如需转载,请联系作者。

原文链接:tuomasi.blog.csdn.net/article/details/122699912

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。