spark 学习笔记(部署过程等)

举报
菜鸟级攻城狮 发表于 2021/03/14 15:40:59 2021/03/14
【摘要】 1.准备环境2.安装JDK3.上传spark安装包4.解压spark并修改配置文件(两个配置文件,第一个配置文件添加了3个配置文件)5.将配置好的spark安装程序拷贝给其他机器for i in {5..8}; do scp -r /bigdata/spark-2.2.0-bin-hadoop2.7/ node-$i:/bigdata; done 6.启动spark (sbin/start-...

1.准备环境
2.安装JDK
3.上传spark安装包
4.解压spark并修改配置文件(两个配置文件,第一个配置文件添加了3个配置文件)
5.将配置好的spark安装程序拷贝给其他机器for i in {5..8}; do scp -r /bigdata/spark-2.2.0-bin-hadoop2.7/ node-$i:/bigdata; done 

6.启动spark (sbin/start-all.sh)  
  问题:Worker怎么知道Master在哪里嗯?读取spark-env.sh文件得知Master在哪里的
7.通过web页面访问spark管理页面(master所在机器的地址+8080端口)

-----------------------------

配置了高可用的spark集群,修改了一个配置文件
配置了Worker运行时的资源(内存、cores)

启动集群
    1.启动ZK集群
    2.启动spark集群,但是只会启动一个Master,另外一个Master手动启动

-----------------------------

提交第一个spark应用到集群中运行
bin/spark-submit --master spark://node-5:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.2.0.jar 100


 bin/spark-submit --master spark://node-5:7077 --class org.apache.spark.examples.SparkPi --executor-memory 2048mb --total-executor-cores 12  examples/jars/spark-examples_2.11-2.2.0.jar 1000

--executor-memory 每个executor使用的内存大小
--total-executor-cores 整个app使用的核数

-----------------------------

提交一个spark程序到spark集群,会产生哪些进程?

SparkSubmint(Driver)提交任务
Executor 执行真正的计算任务的

提交任务可以指定多个master地址,目的是为了提交任务高可用

 bin/spark-submit --master spark://node-4:7077,node-5:7077 --class org.apache.spark.examples.SparkPi --executor-memory 2048mb --total-executor-cores 12  examples/jars/spark-examples_2.11-2.2.0.jar 1000

 ----------------------------

Spark Shell(是一个交互式的命令行,里面可以写spark程序,方便学习和测试,他也是一个客户端,用于提交spark应用程序)

/bigdata/spark-2.2.0-bin-hadoop2.7/bin/spark-shell
上面的方式没有指定master的地址,即用的是spark的local模式运行的(模拟的spark集群用心的过程)

/bigdata/spark-2.2.0-bin-hadoop2.7/bin/spark-shell --master spark://node-4:7077,node-5:7077
上面是指定了master的地址,那么就会将任务提交到集群,开始时sparksubmit(客户端)要连接Master,并申请计算资源(内存和核数),Master进行资源调度(就是让那些Worker启动Executor),在准备工作时,这些进程都已经创建好了

用spark Shell完成WordCount计算
启动HDFS(上传数据到hdfs),sc是spark core(RDD)的执行入口
sc.textFile("hdfs://node-4:9000/wc").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).collect

 ----------------------------

 Yarn和Spark的StandAlone调度模式对比

 ResouceManager        Master   管理子节点、资源调度、接收任务请求
 NodeManger            Worker   管理当前节点,并管理子进程
 YarnChild             Executor 运行真正的计算逻辑的(Task)
 Client                SparkSubmit  (Client + ApplicaitonMaster)提交app,管理该任务的Executor
 ApplicaitonMaster                      并将Task提交到(Executor)
     
----------------------------

用idea编写spark程序
创建RDD,然后对RDD进行操作(调用RDD的方法,方法分为两类,一个叫Transformation(懒 lazy),一类叫Action(会执行任务))

rdd上的方法和scala原生的方法是有区别的

写好程序,打包上传集群运行

本地模式运行spark程序,setMaster("local[*]")

----------------------------

RDD的算子

#常用Transformation(即转换,延迟加载)
#通过并行化scala集合创建RDD
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
#查看该rdd的分区数量
rdd1.partitions.length


val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
val rdd3 = rdd2.filter(_>10)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)


val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j"))
rdd4.flatMap(_.split(' ')).collect

val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))


List("a b c", "a b b") =List("a","b",))

rdd5.flatMap(_.flatMap(_.split(" "))).collect

#union求并集,注意类型要一致
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect

#intersection求交集
val rdd9 = rdd6.intersection(rdd7)


val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))

#join(连接)
val rdd3 = rdd1.join(rdd2)
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd3 = rdd1.rightOuterJoin(rdd2)


#groupByKey
val rdd3 = rdd1 union rdd2
rdd3.groupByKey
//(tom,CompactBuffer(1, 8, 2))
rdd3.groupByKey.map(x=>(x._1,x._2.sum))
groupByKey.mapValues(_.sum).collect
Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(9, 2)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)))


#WordCount
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).groupByKey.map(t=>(t._1, t._2.sum)).collect

#cogroup
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))

#cartesian笛卡尔积
val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2)

###################################################################################################

#spark action
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

#collect
rdd1.collect

#reduce
val r = rdd1.reduce(_+_)

#count
rdd1.count

#top
rdd1.top(2)

#take
rdd1.take(2)

#first(similer to take(1))
rdd1.first

#takeOrdered
rdd1.takeOrdered(3)

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。