Spark集群搭建
报告摘要
本实验主要介绍了如何安装Spark集群,并使Spark能够读取OBS数据,实现存算分离,提高计算性能。同时,通过编写pyspark脚本,完成了数据清洗和处理,实现了相关任务。整个实验过程详细地描述了每一步的操作,并给出了实验结论。通过本实验,我们能够更好地了解Spark集群的搭建和数据处理的流程,为我们今后的工作打下了良好的基础。
1.1 实验介绍
1.1.1 关于本实验
本实验介绍安装Spark集群,并使Spark能够读取OBS数据,通过该实验使Spark集群能够实现存算分离,提高计算性能。本实验编写pyspark脚本,完成若干任务
1.1.2 实验目的
掌握Spark集群搭建
掌握Spark集群与OBS互联,实现存算分离
掌握Spark编程,完成相关任务
1.2 Spark集群存算分离
1.2.1 搭建Spark集群
步骤 1 获取spark安装包
node-0001节点下载Spark安装包
cd /root
wget https://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-without-hadoop.tgz
注:安装包默认存放位置为/root。若不可使用wget下载文件,则可在实验前由授课老师提前下载完成。待实验开始前,由老师通过U盘拷贝或scp远程发送给学员。
步骤 2 解压spark安装包
node-0001节点执行下列命令:
复制安装包到/home/modules目录下
cp /root/spark-2.3.0-bin-without-hadoop.tgz /home/modules
cd /home/modules
解压安装包
tar -zxvf spark-2.3.0-bin-without-hadoop.tgz
mv spark-2.3.0-bin-without-hadoop spark-2.3.0
步骤 3 配置spark jar包
在node-0001节点,复制jar包到spark/jar下
cp /root/hadoop-huaweicloud-2.8.3-hw-39.jar /home/modules/spark-2.3.0/jars/
cp /home/modules/hadoop-2.8.3/share/hadoop/common/lib/snappy-java-1.0.4.1.jar /home/modules/spark-2.3.0/jars/
步骤 4 配置spark配置文件
node-0001节点执行下列命令:
cd /home/modules/spark-2.3.0/conf/
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
文件末尾添加如下内容:
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.aarch64
export SCALA_HOME=/home/modules/spark-2.3.0/examples/src/main/scala
export HADOOP_HOME=/home/modules/hadoop-2.8.3
export HADOOP_CONF_DIR=/home/modules/hadoop-2.8.3/etc/hadoop
export SPARK_HOME=/home/modules/spark-2.3.0
export SPARK_DIST_CLASSPATH=$(/home/modules/hadoop-2.8.3/bin/hadoop classpath)
步骤 5 分发Spark
node-0001节点执行下列命令:
scp -r /home/modules/spark-2.3.0/ root@node-0002:/home/modules/
scp -r /home/modules/spark-2.3.0/ root@node-0003:/home/modules/
scp -r /home/modules/spark-2.3.0/ root@node-0004:/home/modules/
步骤 6 配置环境变量
各节点执行:
vim /etc/profile
添加如下内容:
export SPARK_HOME=/home/modules/spark-2.3.0
export PATH=${SPARK_HOME}/bin:${SPARK_HOME}/sbin:$PATH
保存退出
各节点执行如下命令,使环境变量生效:
source /etc/profile
1.2.2 验证存算分离
步骤 1 查看要计算的文件
本次实验验证Spark与OBS实现存算分离,数据使用实验2.3.2中上传的playerinfo.txt文件。
数据如下:
Alex James Lax Genu
Kerry Mary Olivia William
Hale Edith Vera Robert
Mary Olivia James Lax
Edith Vera Robertm Genu
步骤 2 计算上述数据的wordcount
启动yarn
启动pyspark
[root@node-0001 conf]# pyspark
输入下列代码:
lines = spark.read.text("obs://bigdatapro-obs-c2e9/").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
2.1 Spark编程
2.1.1 输出评分最高的前十个电影的电影名
from pyspark.sql.functions import desc
movies = spark.read.format('csv').option('header', False).option('delimiter', ',').load('obs://bigdatapro-obs-7185/movie_clean.csv').toDF('id', 'title', 'director', 'grade', 'num')
# 读取movie_clean.csv中的数据按照csv的格式并制作数据帧
# 数据帧的格式为(id,title,director,grade,num)
movies = movies.withColumn('grade',movies['grade'].cast('float'))
# 使用orderBy函数以grade为关键字,保留前10调数据
top10_movies = movies.orderBy(desc('grade')).limit(10)
top10_movies =top10_movies.collect()
for a in top10_movies:
print(a['title'])
# 输出电影名字
2.1.2 输出平均每部电影有多少人进行评价
from pyspark.sql.functions import desc
from pyspark.sql.functions import avg
# 读取movie_clean.csv中的数据按照csv的格式并制作数据帧
# 数据帧的格式为(id,title,director,grade,num)
movies = spark.read.format('csv').option('header', False).option('delimiter', ',').load('obs://bigdatapro-obs-7185/movie_clean.csv').toDF('id', 'title', 'director', 'grade', 'num')
# 使用avg计算num列的平均值
avg_num = movies.agg(avg('num')).collect()[0][0]
print("The average number of comment per movie is", avg_num)
2.1.3 输出每个分段的电影各有多少部
from pyspark.sql.functions import floor,col
# 将电影评分按照0.5分为间隔进行分段,并计算每个分段内有多少部电影
movies = spark.read.format('csv').option('header', False).option('delimiter', ',').load('obs://bigdatapro-obs-7185/movie_clean.csv').toDF('id', 'title', 'director', 'grade', 'num')
movie_count = movies.groupBy(floor(col('grade')*2)/2).count().orderBy(floor(col('grade')*2)/2)
# 按照0.5的间隔分组 分组公式为 floor(grade*2)/2
movie_count.show()
3.1 理论性能分析
本次实验的数据针对csv格式的数据进行处理。考虑将数据处理成dataFrame。Spark DataFrame是基于Spark SQL,能够利用分布式计算和内存计算,使数据处理速度更快。DataFrame通常能够自动将数据压缩到内存中,这可以减少数据的I/O操作和内存占用,从而提高性能。数据压缩还可以减少数据移动的开销,使数据传输更加高效。DataFrame使用优化器来优化查询计划,以便在查询时最小化数据移动和计算开销。优化器可以自动选择最佳的执行计划,从而提高查询性能。DataFrame使用列式存储来存储数据,这种存储方式可以提高数据读取和查询的性能。列式存储可以减少I/O操作和内存占用,从而提高数据处理速度。
总结展望
本次Spark大数据实验,我学习到了Spark的基本概念、架构和一些常用的API。我通过实验,掌握了Spark的基本使用方法,包括创建SparkSession、读取和处理数据、使用DataFrame和SQL等。 通过本次实验,我深刻认识到了大数据技术在数据分析和处理中的重要性。Spark作为一种快速、高效、可扩展的大数据处理框架,可以帮助我们有效地处理大数据,并提取有用的信息和见解。同时,Spark还提供了一些机器学习算法,可以帮助我们进行数据挖掘和预测分析。 展望未来,我希望能够进一步学习和深入了解Spark的高级特性和性能优化方法,以便更好地利用Spark来处理和分析大数据。同时,我还希望能够结合实际业务场景,探索和应用更加先进和创新的大数据技术,为企业和社会创造更大的价值。
- 点赞
- 收藏
- 关注作者
评论(0)