spark 环境搭建

举报
小米粒-biubiubiu 发表于 2020/12/02 23:02:42 2020/12/02
【摘要】 一、spark 安装 配置  基于上一篇博客 CentOS6.4环境编译Spark-2.1.0源码,将编译出来的spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz进行解压,并配置环境变量。 执行 spark-shell --master local[2]  命令, 看到如图所示界面 说明安装成功:本地模式 二、从词频统计着手入门spark pa...

一、spark 安装 配置 

基于上一篇博客 CentOS6.4环境编译Spark-2.1.0源码,将编译出来的spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz进行解压,并配置环境变量。

执行 spark-shell --master local[2]  命令,

看到如图所示界面 说明安装成功:本地模式

二、从词频统计着手入门spark


  
  1. package org.apache.spark.examples.streaming
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.storage.StorageLevel
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. /**
  6. * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
  7. *
  8. * Usage: NetworkWordCount <hostname> <port>
  9. * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
  10. *
  11. * To run this on your local machine, you need to first run a Netcat server
  12. * `$ nc -lk 9999`
  13. * and then run the example
  14. * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
  15. */
  16. object NetworkWordCount {
  17. def main(args: Array[String]) {
  18. if (args.length < 2) {
  19. System.err.println("Usage: NetworkWordCount <hostname> <port>")
  20. System.exit(1)
  21. }
  22. StreamingExamples.setStreamingLogLevels()
  23. // Create the context with a 1 second batch size
  24. val sparkConf = new SparkConf().setAppName("NetworkWordCount")
  25. val ssc = new StreamingContext(sparkConf, Seconds(1))
  26. // Create a socket stream on target ip:port and count the
  27. // words in input stream of \n delimited text (eg. generated by 'nc')
  28. // Note that no duplication in storage level only for running locally.
  29. // Replication necessary in distributed scenario for fault tolerance.
  30. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
  31. val words = lines.flatMap(_.split(" "))
  32. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  33. wordCounts.print()
  34. ssc.start()
  35. ssc.awaitTermination()
  36. }
  37. }
  38. // scalastyle:on println

执行 nc -lk 9999 启动一个 9999网络端口,后面 输入 单词数据测试使用

1、spark-submit  方式提交spark应用程序运行

spark-submit   --master local[2]  \

--class org.apache.spark.examples.streaming.NetworkWordCount  \

--name NetworkWordCount \

/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar  hadoop000  9999

 

在nc -lk  9999下面输入a a a b b c d e  以空格隔开的单词,按回车键。就可以看到 

2、spark-shell  --master local[2] 方式 提交 spark 应用任务

等待 spark  shell  启动好之后,直接将以下代码 粘贴进去即可。


  
  1. import org.apache.spark.streaming.{Seconds, StreamingContext}
  2. val ssc = new StreamingContext(sparkConf, Seconds(1)) //每隔一秒执行一次
  3. val lines = ssc.socketTextStream("hadoop000", 9999) //ip 端口
  4. val words = lines.flatMap(_.split(" "))
  5. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  6. wordCounts.print()
  7. ssc.start()
  8. ssc.awaitTermination()

同样在 nc  -lk  9999 下面 输入 d d e e f f  按回车键

三、 spark streaming 的工作原理(粗粒度)

 spark streaming 接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给

spark  engine 处理。 

四、spark streaming 核心概念

  •  StreamingContext

使用频次较高的构造方法


  
  1. def this(sparkContext: SparkContext, batchDuration: Duration) = {
  2. this(sparkContext, null, batchDuration)
  3. }

  
  1. def this(conf: SparkConf, batchDuration: Duration) = {
  2. this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  3. }

batch  interval  可以根据你的应用程序需求的延迟需求以及集群可以用的资源情况来设置。

  • DStreams

对 DStream 操作算子,比如map/flatMap ,其实底层会被翻译为对 DStream 中的每个RDD 都做相同的操作,因为一个

DStream是 由不同批次的RDD所构成的。

  • Input Dstreams  和  Receivers

input Dstreams  代表 从源头接收过来的数据流

receivers  接收 数据流

  • Transformations  on DStreams

五、spark  streaming  实战 之 处理 socket 数据


  
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. /**
  4. * 测试 :采用 nc -lk 6789 进行测试
  5. */
  6. object NetworkWordCount {
  7. def main(args: Array[String]): Unit = {
  8. /**
  9. * 这里要设置成local[2]。
  10. */
  11. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  12. /**
  13. * 创建streamingContext
  14. */
  15. val ssc = new StreamingContext(sparkConf, Seconds(5))
  16. val lines = ssc.socketTextStream("192.168.42.85", 6789)
  17. val result = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)
  18. result.print()
  19. ssc.start()
  20. ssc.awaitTermination()
  21. }
  22. }

local[?] 对于 是需要 receiver 接收数据的处理任务, 那么本地测试时,local[?] ,?一定是要大于1的,否则receiver 已经占用了一个线程,对于 后面的 operation 就没有 对于的线程可以使用。  

六、spark  streaming 处理文件系统处理


  
  1. object FileWordCount {
  2. def main(args: Array[String]): Unit = {
  3. //此处的local 可以 设置为 local[1],因为 他不需要 receiver 接收 数据
  4. val sparkConf = new SparkConf().setMaster("local").setAppName("FileWordCount")
  5. val ssc = new StreamingContext(sparkConf,Seconds(5))
  6. val lines = ssc.textFileStream("file:///home/logs/")
  7. val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
  8. result.print()
  9. ssc.start()
  10. ssc.awaitTermination()
  11. }
  12. }

启动 main 方法, 在 home/logs 下 新建 a.log 文件,并向其中输入 a  b c  d 保存,看到控制台 输出 单词统计的结果

a  b  c  d  频次均为1 ,说明任务运行成功,再新建 b.log,向 其中输入 e e f f d r 保存,看到控制台输出单词统计结果

,e 频次 2,f 频次 2,d频次 1,r频次 1 说明运行成功,但是当我们 再次 向 a.log 、b.log 中写入新的单词时候,spark 

streaming 并不会 再次去进行 单词统计任务的执行。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/85615733

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。