spark 学习笔记(部署过程等)
1.准备环境
2.安装JDK
3.上传spark安装包
4.解压spark并修改配置文件(两个配置文件,第一个配置文件添加了3个配置文件)
5.将配置好的spark安装程序拷贝给其他机器for i in {5..8}; do scp -r /bigdata/spark-2.2.0-bin-hadoop2.7/ node-$i:/bigdata; done
6.启动spark (sbin/start-all.sh)
问题:Worker怎么知道Master在哪里嗯?读取spark-env.sh文件得知Master在哪里的
7.通过web页面访问spark管理页面(master所在机器的地址+8080端口)
-----------------------------
配置了高可用的spark集群,修改了一个配置文件
配置了Worker运行时的资源(内存、cores)
启动集群
1.启动ZK集群
2.启动spark集群,但是只会启动一个Master,另外一个Master手动启动
-----------------------------
提交第一个spark应用到集群中运行
bin/spark-submit --master spark://node-5:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.2.0.jar 100
bin/spark-submit --master spark://node-5:7077 --class org.apache.spark.examples.SparkPi --executor-memory 2048mb --total-executor-cores 12 examples/jars/spark-examples_2.11-2.2.0.jar 1000
--executor-memory 每个executor使用的内存大小
--total-executor-cores 整个app使用的核数
-----------------------------
提交一个spark程序到spark集群,会产生哪些进程?
SparkSubmint(Driver)提交任务
Executor 执行真正的计算任务的
提交任务可以指定多个master地址,目的是为了提交任务高可用
bin/spark-submit --master spark://node-4:7077,node-5:7077 --class org.apache.spark.examples.SparkPi --executor-memory 2048mb --total-executor-cores 12 examples/jars/spark-examples_2.11-2.2.0.jar 1000
----------------------------
Spark Shell(是一个交互式的命令行,里面可以写spark程序,方便学习和测试,他也是一个客户端,用于提交spark应用程序)
/bigdata/spark-2.2.0-bin-hadoop2.7/bin/spark-shell
上面的方式没有指定master的地址,即用的是spark的local模式运行的(模拟的spark集群用心的过程)
/bigdata/spark-2.2.0-bin-hadoop2.7/bin/spark-shell --master spark://node-4:7077,node-5:7077
上面是指定了master的地址,那么就会将任务提交到集群,开始时sparksubmit(客户端)要连接Master,并申请计算资源(内存和核数),Master进行资源调度(就是让那些Worker启动Executor),在准备工作时,这些进程都已经创建好了
用spark Shell完成WordCount计算
启动HDFS(上传数据到hdfs),sc是spark core(RDD)的执行入口
sc.textFile("hdfs://node-4:9000/wc").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).collect
----------------------------
Yarn和Spark的StandAlone调度模式对比
ResouceManager Master 管理子节点、资源调度、接收任务请求
NodeManger Worker 管理当前节点,并管理子进程
YarnChild Executor 运行真正的计算逻辑的(Task)
Client SparkSubmit (Client + ApplicaitonMaster)提交app,管理该任务的Executor
ApplicaitonMaster 并将Task提交到(Executor)
----------------------------
用idea编写spark程序
创建RDD,然后对RDD进行操作(调用RDD的方法,方法分为两类,一个叫Transformation(懒 lazy),一类叫Action(会执行任务))
rdd上的方法和scala原生的方法是有区别的
写好程序,打包上传集群运行
本地模式运行spark程序,setMaster("local[*]")
----------------------------
RDD的算子
#常用Transformation(即转换,延迟加载)
#通过并行化scala集合创建RDD
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
#查看该rdd的分区数量
rdd1.partitions.length
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
val rdd3 = rdd2.filter(_>10)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j"))
rdd4.flatMap(_.split(' ')).collect
val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))
List("a b c", "a b b") =List("a","b",))
rdd5.flatMap(_.flatMap(_.split(" "))).collect
#union求并集,注意类型要一致
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect
#intersection求交集
val rdd9 = rdd6.intersection(rdd7)
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))
#join(连接)
val rdd3 = rdd1.join(rdd2)
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd3 = rdd1.rightOuterJoin(rdd2)
#groupByKey
val rdd3 = rdd1 union rdd2
rdd3.groupByKey
//(tom,CompactBuffer(1, 8, 2))
rdd3.groupByKey.map(x=>(x._1,x._2.sum))
groupByKey.mapValues(_.sum).collect
Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(9, 2)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)))
#WordCount
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).groupByKey.map(t=>(t._1, t._2.sum)).collect
#cogroup
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))
#cartesian笛卡尔积
val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2)
###################################################################################################
#spark action
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
#collect
rdd1.collect
#reduce
val r = rdd1.reduce(_+_)
#count
rdd1.count
#top
rdd1.top(2)
#take
rdd1.take(2)
#first(similer to take(1))
rdd1.first
#takeOrdered
rdd1.takeOrdered(3)
- 点赞
- 收藏
- 关注作者
评论(0)