《Spark Streaming实时流式大数据处理实战》 ——2.4 实例——Spark文件词频统计
2.4 实例——Spark文件词频统计
通过前面的介绍,我们搭建好了Spark集群(单机同机部署)和开发环境,下面就进行一次Spark的实例演习。这个Demo的流程如下:
从文本文件中读入英文句子,对其中的英文单词进行归纳统计,并输出每个英文单词的出现频率,如图2.20所示。
?注:这个看似简单的任务其实在大规模数据处理中是非常实用的,对于词频的统计在舆情分析和自然语言处理等领域都有着广泛的应用,后续章节中我们会利用流式处理Spark Streaminng的方式对数据源进行词频统计。
图2.20 WordFreq_Spark原理图
首先对原始文本进行一次flatMap(转移操作),展开成类似数组元素的形式存储,之后做一次一对一映射,将每个词映射成为词和它出现的次数构成的二元组,最后做一次reduceByKey操作。这里的Key便是每个词,将多个重复词合并,并将出现次数求和,从而得到每个词在输入文本中的出现频率。下面我们来一步步实现这个Spark文件词频统计程序。
(1)创建一个Spark词频统计的项目,并跳过archetype项目模板的选择,将项目命名为wordFreqFileSpark,如图2.21所示。
图2.21 创建基于Maven的Scala项目
在随后的界面中对项目的名称、包名及版本号进行简单设置,如图2.22所示。
图2.22 Maven项目设置
在设置好后,IDE会自动生成项目结构,不过我们需要将目录名重命名为src/main/scala。
(2)对Maven项目添加Scala属性,步骤为选择:Right click on project→configure→Add Scala Nature选项。
(3)打开项目的pom.xml文件,可以看到我们设置的Maven项目的基本信息:
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId><!—组织名-->
<artifactId>wordFreqFileSpark</artifactId><!—项目名-->
<version>0.1</version><!—版本号-->
<dependencies>
之后添加pom.xml中的依赖包,添加spark、log及编译工具等,代码如下:
<dependencies>
<dependency> <!--Spark依赖包 -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency><!--Log日志依赖包 -->
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency><!--日志依赖接口-->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>
</dependencies>
我们添加Spark的依赖包,在pom.xml中还需要配置build插件,添加一个用于混合Scala和Java编译的插件maven-scala-plugin。另外,还添加了一个用于将所有依赖包都编译到一个jar包中的插件maven-assembly-plugin,这样就能有一个完整的jar包,提交给Spark集群进行运算,代码如下:
<build>
<plugins>
<!--混合scala/java编译-->
<plugin><!--scala编译插件-->
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source><!--设置Java源-->
<target>1.7</target>
</configuration>
</plugin>
<!-- for fatjar -->
<plugin><!--将所有依赖包打入同一个jar包中-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef><!—jar包的后缀名-->
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<!--设置程序的入口类-->
<mainClass>com.sparkstreaming.action.main.WordFreq</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
(4)从Build Path中移除Scala Library(由于在Maven中添加了Spark Core的依赖项,而Spark是依赖于Scala的,Scala的jar包已经存在于Maven Dependency中)。
(5)添加package包sparkstreaming.action.wordfreq。
(6)创建scala object并命名为WordFreq。
在做完以上准备工作后,我们的目标是从文本中读入数据,利用空格进行数据切分后,按照<词,1>的key、value形式映射后,最终以key为reduce对象,整合每个词出现的次数,核心代码如下:
package sparkstreaming.action.wordfreq.main
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordFreq {
def main(args: Array[String]) {
// 创建Spark上下文环境
val conf = new SparkConf()
.setAppName("WordFreq_Spark")
.setMaster("spark://127.0.0.1:7077")
// 创建Spark上下文
val sc = new SparkContext(conf)
// 文本文件名,读者可根据自己的路径进行修改
val txtFile = "input.txt"
// 读取文本文件
val txtData = sc.textFile(txtFile)
// 缓存文本RDD
txtData.cache()
// 计数
txtData.count()
// 以空格分割进行词频统计
val wcData = txtData.flatMap { line => line.split(" ") }
.map { word => (word, 1) }
.reduceByKey(_ + _)
// 汇总RDD信息并打印
wcData.collect().foreach(println)
sc.stop
}
}
对上面所述的Spark代码下面进行逐句分析,帮助读者理解:
(1)对于每个Spark程序而言,都需要依赖于SparkContext进行,我们可以通过SparkConf对其进行配置,比如上面这段程序,我们配置了程序名为WordFreq_Spark及Master节点的IP地址。
(2)我们从input.txt的文本文件中将数据读入,并且对其进行了cache和count操作。
(3)这一步是词频统计的关键一步,反映了图2.20所示的整个流程,利用flatMap以空格为切分将输入的文本映射为一个向量,之后用map将每个元素映射为(元素,词频)的二元组,最后以每个词元素为key进行reduce(合并)操作,从而统计出每个词出现的词频(注:这一步是分散在集群当中的所有Worker中执行的,即每个Worker可能只计算了一小部分)。
(4)利用collect操作将每个Worker节点的计算结果汇总到Driver节点,对结果进行输出,并停止Spark上下文环境。
将代码保存好之后,在项目目录执行mvn install对代码进行编译,编译成功后会产生一个target文件,文件中有编译好的两个jar包,我们需要的是xxxjar-with-dependencies来提交给Spark集群运行,代码如下:
$ ll target/
drwxr-xr-x 10 xiaolitao staff 320 7 19 16:34 ./
drwxr-xr-x 12 xiaolitao staff 384 11 18 13:10 ../
-rw-r--r-- 1 xiaolitao staff 538636 11 16 2017 WordFreq_Spark-0.1-jar-
with-dependencies.jar
-rw-r--r-- 1 xiaolitao staff 8055 11 16 2017 WordFreq_Spark-0.1.jar
drwxr-xr-x 2 xiaolitao staff 64 11 16 2017 archive-tmp/
drwxr-xr-x 4 xiaolitao staff 128 11 18 13:10 classes/
-rw-r--r-- 1 xiaolitao staff 1 11 16 2017 classes.timestamp
drwxr-xr-x 3 xiaolitao staff 96 11 16 2017 maven-archiver/
drwxr-xr-x 3 xiaolitao staff 96 11 16 2017 maven-status/
drwxr-xr-x 2 xiaolitao staff 64 11 16 2017 test-classes/
之后利用下面的命令将编译好的jar包提交到Spark集群运行,代码如下:
./spark-2.2.0-bin-hadoop2.7/bin/spark-submit \
--class sparkstreaming_action.wordfreq.main.WordFreq \
--num-executors 4 \
--driver-memory 1G \
--executor-memory 1g \
--executor-cores 1 \
--conf spark.default.parallelism=1000 \
target/WordFreq_Spark-0.1-jar-with-dependencies.jar
设置执行的Executor数为4,Driver进程的大小为1g,然后再设置每个Executor的内存大小和占用核数。
进入2.3.4节讲解的Spark UI监控界面,如图2.23所示。
图2.23 WordFreq Spark Master监控页面
在网页最顶端,可以看到整个Spark集群的基本情况,包括Worker数,Core的数量、内存总量及当前状态等内容;再往下可以看到正在运行的程序就是我们刚提交的WordFreq_Spark程序,以及其占用的内存数和核数;最下面可以看到过往的提交历史记录。
另外,还可以单击Worker,查看每个Worker的详细运行情况。如图2.24所示,可以看到Worker中Executor的运行情况。
?注:这里我们看到的是Removed Executors,是因为笔者在截图的时候程序已经运行结束,笔者是从历史记录中截取的。
如果一切顺利,将会看到Console中打印出每个词的词频数量,恭喜你已经成功迈入了Spark的世界,运行结果如下:
$ ./run.sh
(data,1)
(learn,1)
(I,2)
(use,1)
(can,1)
(process,1)
(spark,1)
(it,1)
(big,1)
(stremaing,1)
(to,2)
(want,1)
图2.24 WordFreq Spark Worker监控页面
- 点赞
- 收藏
- 关注作者
评论(0)