Spark集群搭建

举报
yd_254825535 发表于 2023/11/15 12:44:20 2023/11/15
【摘要】 本实验主要介绍了如何安装Spark集群,并使Spark能够读取OBS数据,实现存算分离,提高计算性能。同时,通过编写pyspark脚本,完成了数据清洗和处理,实现了相关任务。整个实验过程详细地描述了每一步的操作,并给出了实验结论。通过本实验,我们能够更好地了解Spark集群的搭建和数据处理的流程,为我们今后的工作打下了良好的基础。

报告摘要

本实验主要介绍了如何安装Spark集群,并使Spark能够读取OBS数据,实现存算分离,提高计算性能。同时,通过编写pyspark脚本,完成了数据清洗和处理,实现了相关任务。整个实验过程详细地描述了每一步的操作,并给出了实验结论。通过本实验,我们能够更好地了解Spark集群的搭建和数据处理的流程,为我们今后的工作打下了良好的基础。

1.1 实验介绍

1.1.1 关于本实验

本实验介绍安装Spark集群,并使Spark能够读取OBS数据,通过该实验使Spark集群能够实现存算分离,提高计算性能。本实验编写pyspark脚本,完成若干任务

1.1.2 实验目的

  • 掌握Spark集群搭建

  • 掌握Spark集群与OBS互联,实现存算分离

  • 掌握Spark编程,完成相关任务

1.2 Spark集群存算分离

1.2.1 搭建Spark集群

步骤 1 获取spark安装包

node-0001节点下载Spark安装包

cd /root
wget https://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-without-hadoop.tgz

注:安装包默认存放位置为/root。若不可使用wget下载文件,则可在实验前由授课老师提前下载完成。待实验开始前,由老师通过U盘拷贝或scp远程发送给学员。
image.png

步骤 2 解压spark安装包

node-0001节点执行下列命令:

复制安装包到/home/modules目录下

image.png

cp /root/spark-2.3.0-bin-without-hadoop.tgz /home/modules
cd /home/modules

解压安装包

tar -zxvf spark-2.3.0-bin-without-hadoop.tgz
mv spark-2.3.0-bin-without-hadoop spark-2.3.0

image.png

步骤 3 配置spark jar包

在node-0001节点,复制jar包到spark/jar下
image.png

cp /root/hadoop-huaweicloud-2.8.3-hw-39.jar /home/modules/spark-2.3.0/jars/
cp /home/modules/hadoop-2.8.3/share/hadoop/common/lib/snappy-java-1.0.4.1.jar /home/modules/spark-2.3.0/jars/

步骤 4 配置spark配置文件

node-0001节点执行下列命令:

cd /home/modules/spark-2.3.0/conf/
mv spark-env.sh.template spark-env.sh
vim spark-env.sh

image.png

文件末尾添加如下内容:

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.aarch64
export SCALA_HOME=/home/modules/spark-2.3.0/examples/src/main/scala
export HADOOP_HOME=/home/modules/hadoop-2.8.3
export HADOOP_CONF_DIR=/home/modules/hadoop-2.8.3/etc/hadoop
export SPARK_HOME=/home/modules/spark-2.3.0
export SPARK_DIST_CLASSPATH=$(/home/modules/hadoop-2.8.3/bin/hadoop classpath)

步骤 5 分发Spark

node-0001节点执行下列命令:
image.png

scp -r /home/modules/spark-2.3.0/ root@node-0002:/home/modules/
scp -r /home/modules/spark-2.3.0/ root@node-0003:/home/modules/
scp -r /home/modules/spark-2.3.0/ root@node-0004:/home/modules/

image.png

image.png

步骤 6 配置环境变量

各节点执行:

vim /etc/profile

添加如下内容:

image.png

export SPARK_HOME=/home/modules/spark-2.3.0
export PATH=${SPARK_HOME}/bin:${SPARK_HOME}/sbin:$PATH

保存退出

各节点执行如下命令,使环境变量生效:

source /etc/profile

1.2.2 验证存算分离

步骤 1 查看要计算的文件

本次实验验证Spark与OBS实现存算分离,数据使用实验2.3.2中上传的playerinfo.txt文件。

数据如下:

Alex James Lax Genu

Kerry Mary Olivia William

Hale Edith Vera Robert

Mary Olivia James Lax

Edith Vera Robertm Genu

步骤 2 计算上述数据的wordcount

启动yarn

start-yarn.sh

启动pyspark

image.png

[root@node-0001 conf]# pyspark

输入下列代码:

lines = spark.read.text("obs://bigdatapro-obs-c2e9/").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
output = counts.collect()
for (word, count) in output:
	print("%s: %i" % (word, count))

image.png

image.png

2.1 Spark编程

2.1.1 输出评分最高的前十个电影的电影名

from pyspark.sql.functions import desc
movies = spark.read.format('csv').option('header', False).option('delimiter', ',').load('obs://bigdatapro-obs-7185/movie_clean.csv').toDF('id', 'title', 'director', 'grade', 'num') 
# 读取movie_clean.csv中的数据按照csv的格式并制作数据帧
# 数据帧的格式为(id,title,director,grade,num)
movies = movies.withColumn('grade',movies['grade'].cast('float')) 
# 使用orderBy函数以grade为关键字,保留前10调数据
top10_movies = movies.orderBy(desc('grade')).limit(10) 
top10_movies =top10_movies.collect() 
for a in top10_movies: 
    print(a['title'])
# 输出电影名字

image.png

2.1.2 输出平均每部电影有多少人进行评价

from pyspark.sql.functions import desc
from pyspark.sql.functions import avg
# 读取movie_clean.csv中的数据按照csv的格式并制作数据帧
# 数据帧的格式为(id,title,director,grade,num)
movies = spark.read.format('csv').option('header', False).option('delimiter', ',').load('obs://bigdatapro-obs-7185/movie_clean.csv').toDF('id', 'title', 'director', 'grade', 'num') 
# 使用avg计算num列的平均值
avg_num = movies.agg(avg('num')).collect()[0][0]
print("The average number of comment per movie is", avg_num)

image.png

2.1.3 输出每个分段的电影各有多少部

from pyspark.sql.functions import floor,col
# 将电影评分按照0.5分为间隔进行分段,并计算每个分段内有多少部电影
movies = spark.read.format('csv').option('header', False).option('delimiter', ',').load('obs://bigdatapro-obs-7185/movie_clean.csv').toDF('id', 'title', 'director', 'grade', 'num') 
movie_count = movies.groupBy(floor(col('grade')*2)/2).count().orderBy(floor(col('grade')*2)/2)
# 按照0.5的间隔分组 分组公式为 floor(grade*2)/2
movie_count.show()

image.png

3.1 理论性能分析

本次实验的数据针对csv格式的数据进行处理。考虑将数据处理成dataFrame。Spark DataFrame是基于Spark SQL,能够利用分布式计算和内存计算,使数据处理速度更快。DataFrame通常能够自动将数据压缩到内存中,这可以减少数据的I/O操作和内存占用,从而提高性能。数据压缩还可以减少数据移动的开销,使数据传输更加高效。DataFrame使用优化器来优化查询计划,以便在查询时最小化数据移动和计算开销。优化器可以自动选择最佳的执行计划,从而提高查询性能。DataFrame使用列式存储来存储数据,这种存储方式可以提高数据读取和查询的性能。列式存储可以减少I/O操作和内存占用,从而提高数据处理速度。

总结展望

本次Spark大数据实验,我学习到了Spark的基本概念、架构和一些常用的API。我通过实验,掌握了Spark的基本使用方法,包括创建SparkSession、读取和处理数据、使用DataFrame和SQL等。 通过本次实验,我深刻认识到了大数据技术在数据分析和处理中的重要性。Spark作为一种快速、高效、可扩展的大数据处理框架,可以帮助我们有效地处理大数据,并提取有用的信息和见解。同时,Spark还提供了一些机器学习算法,可以帮助我们进行数据挖掘和预测分析。 展望未来,我希望能够进一步学习和深入了解Spark的高级特性和性能优化方法,以便更好地利用Spark来处理和分析大数据。同时,我还希望能够结合实际业务场景,探索和应用更加先进和创新的大数据技术,为企业和社会创造更大的价值。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。