《Spark Streaming实时流式大数据处理实战》 ——3.8 实例——Spark RDD操作

举报
华章计算机 发表于 2020/02/22 18:39:54 2020/02/22
【摘要】 本节书摘来自华章计算机《Spark Streaming实时流式大数据处理实战》 —— 书中第3章,第3.8节,作者是肖力涛 。

3.8  实例——Spark RDD操作

  2.2节中提到过,将两个文本文件分别包含用户名、地址,以及用户名、电话,对这两个文件利用Spark RDD分别进行了多种操作,在本章介绍完所有的RDD操作类型及RDD的依赖后,我们将实现这个实例。建议读者在读到这里后,也跟着笔者一步步实现这个实例,加深对Spark分布式系统的理解,为后续第3篇的Spark Streaming案例实战打下基础,下面我们一步步开始实现。

  读者首先翻到之前的第2章,回顾一下图2.3中对Spark RDD的多个操作流程,程序中的代码与图2.3是完全对应的。同样,我们需要先在scala-eclipse中建立一个simple-maven项目,这里与2.4节实例类似,此处不再赘述。

  在本实例中,我们需要用到Spark,以及一些输出日志的依赖包,另外需要通过Maven插件将所有代码依赖打包成一个完整的jar包并上传到Spark集群中运行,Maven的依赖项如下:

  

  <groupId>com</groupId><!--组织名-->

  <artifactId>rddOperation</artifactId><!--项目名-->

  <version>0.1</version><!--版本号-->

  <dependencies>

   <dependency> <!--Spark核心依赖包 -->

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-core_2.11</artifactId>

    <version>2.2.0</version>

    <scope>provided</scope><!--运行时提供,打包不添加,Spark集群已自带-->

   </dependency>

   <dependency><!--Log日志依赖包 -->

    <groupId>log4j</groupId>

    <artifactId>log4j</artifactId>

    <version>1.2.17</version>

   </dependency>

   <dependency><!--日志依赖接口-->

    <groupId>org.slf4j</groupId>

    <artifactId>slf4j-log4j12</artifactId>

    <version>1.7.12</version>

   </dependency>

  </dependencies>

  

  关于打包的插件项及配置如下,注意对mainClass的修改:

  

  <build>

   <plugins>

     <!--混合Scala/Java编译-->

     <plugin><!--Scala编译插件-->

     <groupId>org.scala-tools</groupId>

     <artifactId>maven-scala-plugin</artifactId>

     <executions>

      <execution>

       <id>compile</id>

       <goals>

       <goal>compile</goal>

      </goals>

      <phase>compile</phase>

      </execution>

      <execution>

       <id>test-compile</id>

       <goals>

        <goal>testCompile</goal>

       </goals>

       <phase>test-compile</phase>

       </execution>

       <execution>

        <phase>process-resources</phase>

        <goals>

         <goal>compile</goal>

        </goals>

       </execution>

      </executions>

     </plugin>

     <plugin>

      <artifactId>maven-compiler-plugin</artifactId>

      <configuration>

       <source>1.7</source><!--设置Java源-->

       <target>1.7</target>

      </configuration>

     </plugin>

     <!-- for fatjar -->

     <plugin><!--将所有依赖包打入同一个jar包-->

      <groupId>org.apache.maven.plugins</groupId>

      <artifactId>maven-assembly-plugin</artifactId>

      <version>2.4</version>

      <configuration>

       <descriptorRefs>

        <descriptorRef>jar-with-dependencies</descriptorRef><!--jar包的后缀名-->

       </descriptorRefs>

      </configuration>

      <executions>

       <execution>

        <id>assemble-all</id>

        <phase>package</phase>

        <goals>

         <goal>single</goal>

        </goals>

       </execution>

      </executions>

     </plugin>

     <plugin>

      <groupId>org.apache.maven.plugins</groupId>

      <artifactId>maven-jar-plugin</artifactId>

      <configuration>

       <archive>

        <manifest>

         <!--添加类路径-->

          <addClasspath>true</addClasspath>

         <!--设置程序的入口类-->

          <mainClass>sparkstreaming_action.rdd.operation.RDDOperation

        </mainClass>

         </manifest>

        </archive>

       </configuration>

      </plugin>

     </plugins>

  </build>

  

  在配置好依赖项后,新建一个package,命名为sparkstreaming_action.rdd.operation,之后在该包下新建一个Scala Object,命名为RDDOperation,完成后如图3.9所示。

 image.png

图3.9  Spark RDD操作实例项目图

  现在进入RDDOperation文件,编写本次实例的代码,具体代码如下:

  

  package sparkstreaming_action.rdd.operation

  

  import org.apache.spark.SparkConf

  import org.apache.spark.SparkContext

  

  object RDDOperation extends App {

    // Spark配置项

    val conf = new SparkConf()

      .setAppName("rddOperation")

      .setMaster("spark://127.0.0.1:7077")

    // 创建Spark上下文

    val sc = new SparkContext(conf)

    // 文件名,读者可根据自己的路径进行修改

    val txtNameAddr = "name_addr.txt"

    val txtNamePhone = "name_phone.txt"

    // 读入用户地址文件,并按照格式切分,得到对应RDD

    val rddNameAddr = sc.textFile(txtNameAddr).map(record => {

      val tokens = record.split(" ")

      (tokens(0), tokens(1))

    }) // RDD1

    rddNameAddr.cache

    // 读入用户电话文件,切分后得到对应的RDD

  val rddNamePhone = sc.textFile(txtNamePhone).map(record => {

      val tokens = record.split(" ")

      (tokens(0), tokens(1))

    }) // RDD2

    rddNamePhone.cache

    // 以用户名的key进行join操作

    val rddNameAddrPhone = rddNameAddr.join(rddNamePhone) // RDD 3

    // 对用户电话RDD进行HTML格式变化,产生新的RDD

    val rddHtml = rddNameAddrPhone.map(record => {

      val name = record._1

      val addr = record._2._1

      val phone = record._2._2

      s"<h2>姓名:${name}</h2><p>地址:${addr}</p><p>电话:${phone}</p>"

    }) // RDD4

    // 输出操作

    val rddOutput = rddHtml.saveAsTextFile("UserInfo")

    // 根据地址格式得到邮编RDD

    val rddPostcode = rddNameAddr.map(record => {

      val postcode = record._2.split("#")(1)

      (postcode, 1)

    }) // RDD5

    // 汇总邮编出现次数

    val rddPostcodeCount = rddPostcode.reduceByKey(_ + _) // rdd6

    // 打印结果

  rddPostcodeCount.collect().foreach(println)

    sc.stop

  }

  

  在上面的实例中,根据RDD所包含的数据对变量进行命名,同时在注释中清晰地注释了RDD 1、RDD2等,对应了图2.3中的每个RDD。我们首先从包含用户名、地址的文件和包含用户名、电话的两个文本文件中读入两个RDD,并在读入的过程中,分别对数据记录进行了预处理,将记录切分成了key/value的形式,即产生了RDD1和RDD2,其中key是用户名,而value分别是地址和电话。

  然后通过join操作,将RDD1和RDD2两者利用key值合并在一起,并通过map函数将其映射成为HTML标签字符串的RDD4,最后通过saveAsTextFile输出到外部文件中。

  另一方面,将RDD1中的地址,利用地址中的#符号进行切割,得到邮编,并以(邮编,1)的key/value形式返回得到RDD5,通过reduceByKey操作,合并key值相同的记录,最后利用collect操作将所有记录收集到Driver节点,并利用println函数打印输出。

  我们构造两个模拟数据name_addr.txt和name_phone.txt,作为整个程序的输入,观察程序的效果,文件如下:

  

  $ cat name_addr.txt

  bob shanghai#200000

  amy beijing#100000

  alice shanghai#200000

  tom beijing#100000

  lulu hangzhou#310000

  nick shanghai#200000

  

  $ cat name_phone.txt

  bob 15700079421

  amy 18700079458

  alice 17730079427

  tom 16700379451

  lulu 18800074423

  nick 14400033426

  

  在name_addr.txt文件中,利用空格将用户名和地址隔开,而地址使用#将城市和邮编隔开;在name_phone.txt文件中,利用空格将用户名和电话隔开。

  利用mvn clean install命令对整个项目进行编译后,通过如下命令运行整个代码:

  

  $ {your_path}/spark-2.2.0-bin-hadoop2.7/bin/spark-submit \

        --class sparkstreaming_action.rdd.operation.RDDOperation \

        --num-executors 4 \

        --driver-memory 1G \

        --executor-memory 1g  \

        --executor-cores 1 \

        --conf spark.default.parallelism=1000 \

        target/rddOperation-0.1-jar-with-dependencies.jar

  

  执行上述命令运行结束后,我们可以看到命令行打印出如下结果:

  

  (100000,2)

  (200000,3)

  (310000,1)

  

  观察上面的输入文件,上海3人、北京2人、杭州1人,是符合预期的。另外,项目根目录会产生一个UserInfo文件夹,进入该文件夹会发现有大量的输出,除了有用的信息外,Spark还输出了大量空文件,可以利用如下命令,查看所有的文件内容:

  

  $ cat *

  <h2>姓名:tom</h2><p>地址:beijing#100000</p><p>电话:16700379451</p>

  <h2>姓名:alice</h2><p>地址:shanghai#200000</p><p>电话:17730079427</p>

  <h2>姓名:nick</h2><p>地址:shanghai#200000</p><p>电话:14400033426</p>

  <h2>姓名:lulu</h2><p>地址:hangzhou#310000</p><p>电话:18800074423</p>

  <h2>姓名:amy</h2><p>地址:beijing#100000</p><p>电话:18700079458</p>

  <h2>姓名:bob</h2><p>地址:shanghai#200000</p><p>电话:15700079421</p>

  与上面的输入文件进行对比可以发现,join操作已经将key值相同的用户名整合在一起,也按照我们的预期,将HTML代码拼接好,输出到外部文件中。


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。