六十四、Spark-分别统计各个单词个数及特殊字符总个数
共享变量
广播变量(Broadcast Variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本,简单理解:减少内存,减小计算压力;
累加器(Accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);
需求说明:以词频统计WordCount程序为例,处理特殊数据,包括非单词符号,做WordCount的同时统计出特殊字符的数量
目录
原数据展示
注:原数据杂乱无章,与所需单词混淆,间隔且不等
业务逻辑
1、创建本地环境,并设置日志提示级别
val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN")2、创建累加器
val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
3、定义特殊字符集合
val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")
4、将集合作为广播变量广播到各个节点
val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
5、加载数据,创建RDD
val lines: RDD[String] = sc.textFile("data/input/words2.txt")
6、过滤筛选
val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_)) .flatMap(_.split("\\s+")) .filter(ch => { //获取广播数据 val list: List[String] = broadcast.value if (list.contains(ch)) { //特殊字符 mycounter.add(1) false } else { //单词 true } }).map((_, 1)) .reduceByKey(_ + _)7、输出单词统计及特殊字符
wordcountResult.foreach(println) val chResult: lang.Long = mycounter.value println("特殊字符的数量:" + chResult)
完整代码
-
package org.example.spark
-
-
import java.lang
-
import org.apache.commons.lang3.StringUtils
-
import org.apache.spark.broadcast.Broadcast
-
import org.apache.spark.rdd.RDD
-
import org.apache.spark.util.LongAccumulator
-
import org.apache.spark.{SparkConf, SparkContext}
-
-
/**
-
* Author tuomasi
-
* Desc 演示RDD的共享变量
-
*/
-
object RDD_ShareVariable {
-
def main(args: Array[String]): Unit = {
-
//TODO 0.env/创建环境
-
val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
-
val sc: SparkContext = new SparkContext(conf)
-
sc.setLogLevel("WARN")
-
-
//创建计数器/累加器
-
val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
-
//定义一个特殊字符集合
-
val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")
-
//将集合作为广播变量广播到各个节点
-
val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
-
-
//TODO 1.source/加载数据/创建RDD
-
val lines: RDD[String] = sc.textFile("data/input/words2.txt")
-
-
//TODO 2.transformation
-
val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
-
.flatMap(_.split("\\s+"))
-
.filter(ch => {
-
//获取广播数据
-
val list: List[String] = broadcast.value
-
if (list.contains(ch)) { //特殊字符
-
mycounter.add(1)
-
false
-
} else { //单词
-
true
-
}
-
}).map((_, 1))
-
.reduceByKey(_ + _)
-
-
//TODO 3.sink/输出
-
wordcountResult.foreach(println)
-
val chResult: lang.Long = mycounter.value
-
println("特殊字符的数量:" + chResult)
-
}
-
}
程序运行
原数据:
控制台打印:
注:通过对比,该程序实现了单词与特殊字符的分别统计
项目总结
使用广播变量的好处:
1、Driver每次分发任务的时候会把task和计算逻辑的变量发送给Executor。不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。这样会导致消耗大量的内存导致严重的后果。
2、使用广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少;
文章来源: tuomasi.blog.csdn.net,作者:托马斯-酷涛,版权归原作者所有,如需转载,请联系作者。
原文链接:tuomasi.blog.csdn.net/article/details/122699912
- 点赞
- 收藏
- 关注作者
评论(0)