《Spark Streaming实时流式大数据处理实战》 ——3.8 实例——Spark RDD操作
3.8 实例——Spark RDD操作
2.2节中提到过,将两个文本文件分别包含用户名、地址,以及用户名、电话,对这两个文件利用Spark RDD分别进行了多种操作,在本章介绍完所有的RDD操作类型及RDD的依赖后,我们将实现这个实例。建议读者在读到这里后,也跟着笔者一步步实现这个实例,加深对Spark分布式系统的理解,为后续第3篇的Spark Streaming案例实战打下基础,下面我们一步步开始实现。
读者首先翻到之前的第2章,回顾一下图2.3中对Spark RDD的多个操作流程,程序中的代码与图2.3是完全对应的。同样,我们需要先在scala-eclipse中建立一个simple-maven项目,这里与2.4节实例类似,此处不再赘述。
在本实例中,我们需要用到Spark,以及一些输出日志的依赖包,另外需要通过Maven插件将所有代码依赖打包成一个完整的jar包并上传到Spark集群中运行,Maven的依赖项如下:
<groupId>com</groupId><!--组织名-->
<artifactId>rddOperation</artifactId><!--项目名-->
<version>0.1</version><!--版本号-->
<dependencies>
<dependency> <!--Spark核心依赖包 -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope><!--运行时提供,打包不添加,Spark集群已自带-->
</dependency>
<dependency><!--Log日志依赖包 -->
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency><!--日志依赖接口-->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>
</dependencies>
关于打包的插件项及配置如下,注意对mainClass的修改:
<build>
<plugins>
<!--混合Scala/Java编译-->
<plugin><!--Scala编译插件-->
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source><!--设置Java源-->
<target>1.7</target>
</configuration>
</plugin>
<!-- for fatjar -->
<plugin><!--将所有依赖包打入同一个jar包-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef><!--jar包的后缀名-->
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--添加类路径-->
<addClasspath>true</addClasspath>
<!--设置程序的入口类-->
<mainClass>sparkstreaming_action.rdd.operation.RDDOperation
</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
在配置好依赖项后,新建一个package,命名为sparkstreaming_action.rdd.operation,之后在该包下新建一个Scala Object,命名为RDDOperation,完成后如图3.9所示。
图3.9 Spark RDD操作实例项目图
现在进入RDDOperation文件,编写本次实例的代码,具体代码如下:
package sparkstreaming_action.rdd.operation
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object RDDOperation extends App {
// Spark配置项
val conf = new SparkConf()
.setAppName("rddOperation")
.setMaster("spark://127.0.0.1:7077")
// 创建Spark上下文
val sc = new SparkContext(conf)
// 文件名,读者可根据自己的路径进行修改
val txtNameAddr = "name_addr.txt"
val txtNamePhone = "name_phone.txt"
// 读入用户地址文件,并按照格式切分,得到对应RDD
val rddNameAddr = sc.textFile(txtNameAddr).map(record => {
val tokens = record.split(" ")
(tokens(0), tokens(1))
}) // RDD1
rddNameAddr.cache
// 读入用户电话文件,切分后得到对应的RDD
val rddNamePhone = sc.textFile(txtNamePhone).map(record => {
val tokens = record.split(" ")
(tokens(0), tokens(1))
}) // RDD2
rddNamePhone.cache
// 以用户名的key进行join操作
val rddNameAddrPhone = rddNameAddr.join(rddNamePhone) // RDD 3
// 对用户电话RDD进行HTML格式变化,产生新的RDD
val rddHtml = rddNameAddrPhone.map(record => {
val name = record._1
val addr = record._2._1
val phone = record._2._2
s"<h2>姓名:${name}</h2><p>地址:${addr}</p><p>电话:${phone}</p>"
}) // RDD4
// 输出操作
val rddOutput = rddHtml.saveAsTextFile("UserInfo")
// 根据地址格式得到邮编RDD
val rddPostcode = rddNameAddr.map(record => {
val postcode = record._2.split("#")(1)
(postcode, 1)
}) // RDD5
// 汇总邮编出现次数
val rddPostcodeCount = rddPostcode.reduceByKey(_ + _) // rdd6
// 打印结果
rddPostcodeCount.collect().foreach(println)
sc.stop
}
在上面的实例中,根据RDD所包含的数据对变量进行命名,同时在注释中清晰地注释了RDD 1、RDD2等,对应了图2.3中的每个RDD。我们首先从包含用户名、地址的文件和包含用户名、电话的两个文本文件中读入两个RDD,并在读入的过程中,分别对数据记录进行了预处理,将记录切分成了key/value的形式,即产生了RDD1和RDD2,其中key是用户名,而value分别是地址和电话。
然后通过join操作,将RDD1和RDD2两者利用key值合并在一起,并通过map函数将其映射成为HTML标签字符串的RDD4,最后通过saveAsTextFile输出到外部文件中。
另一方面,将RDD1中的地址,利用地址中的#符号进行切割,得到邮编,并以(邮编,1)的key/value形式返回得到RDD5,通过reduceByKey操作,合并key值相同的记录,最后利用collect操作将所有记录收集到Driver节点,并利用println函数打印输出。
我们构造两个模拟数据name_addr.txt和name_phone.txt,作为整个程序的输入,观察程序的效果,文件如下:
$ cat name_addr.txt
bob shanghai#200000
amy beijing#100000
alice shanghai#200000
tom beijing#100000
lulu hangzhou#310000
nick shanghai#200000
$ cat name_phone.txt
bob 15700079421
amy 18700079458
alice 17730079427
tom 16700379451
lulu 18800074423
nick 14400033426
在name_addr.txt文件中,利用空格将用户名和地址隔开,而地址使用#将城市和邮编隔开;在name_phone.txt文件中,利用空格将用户名和电话隔开。
利用mvn clean install命令对整个项目进行编译后,通过如下命令运行整个代码:
$ {your_path}/spark-2.2.0-bin-hadoop2.7/bin/spark-submit \
--class sparkstreaming_action.rdd.operation.RDDOperation \
--num-executors 4 \
--driver-memory 1G \
--executor-memory 1g \
--executor-cores 1 \
--conf spark.default.parallelism=1000 \
target/rddOperation-0.1-jar-with-dependencies.jar
执行上述命令运行结束后,我们可以看到命令行打印出如下结果:
(100000,2)
(200000,3)
(310000,1)
观察上面的输入文件,上海3人、北京2人、杭州1人,是符合预期的。另外,项目根目录会产生一个UserInfo文件夹,进入该文件夹会发现有大量的输出,除了有用的信息外,Spark还输出了大量空文件,可以利用如下命令,查看所有的文件内容:
$ cat *
<h2>姓名:tom</h2><p>地址:beijing#100000</p><p>电话:16700379451</p>
<h2>姓名:alice</h2><p>地址:shanghai#200000</p><p>电话:17730079427</p>
<h2>姓名:nick</h2><p>地址:shanghai#200000</p><p>电话:14400033426</p>
<h2>姓名:lulu</h2><p>地址:hangzhou#310000</p><p>电话:18800074423</p>
<h2>姓名:amy</h2><p>地址:beijing#100000</p><p>电话:18700079458</p>
<h2>姓名:bob</h2><p>地址:shanghai#200000</p><p>电话:15700079421</p>
与上面的输入文件进行对比可以发现,join操作已经将key值相同的用户名整合在一起,也按照我们的预期,将HTML代码拼接好,输出到外部文件中。
- 点赞
- 收藏
- 关注作者
评论(0)