Spark集群搭建与大数据应用
本实验使用了华为云服务提供的基于鲲鹏计算架构的远程ECS主机和OBS对象存储服务,完成了Spark集群搭建任务、HDFS环境配置和系统功能验证等实验任务;同时安装Spark集群,并使Spark能够读取OBS数据,通过该实验使Spark集群能够实现存算分离,提高计算性能。实验结束后,成功在4个ECS服务器节点上部署运行了HDFS和PySpark,成功跑出了单词计数算法WordCount和豆瓣影评数据分析,且分布式系统可以通过SSH协议正常访问;同时,通过实验任务的推进也拓展了有关于虚拟化技术与云计算课程的相关知识。
一、实验内容
使用华为云提供的ECS云服务,通过搭建开源Hadoop集群从而掌握Hadoop搭建方法,然后在此基础上使用Spark计算。并且利用对象存储服务,使开源Hadoop与华为云OBS服务互联,使Hadoop集群可读取OBS数据。
具体包括以下几个方面的实践内容:
(1)完成Hadoop集群的搭建;
(2)完成Spark与OBS的互联;
(3)完成WordCount、存算分离;
(4)编写程序实现Spark影评分析。
二、实验原理与参数
本次实验的原理与参数介绍如下:
2.1 Spark集群
Spark是一种分布式计算框架,它可以在大型集群上高效地执行数据处理、机器学习、图计算等任务。Spark采用了基于内存的数据处理方式,能够将数据缓存到内存中,从而大大提高了计算性能。如下图所示:
一个Spark集群通常由以下几个组件组成:
1. Driver节点
Driver节点是Spark应用程序的主节点,负责协调整个应用程序的执行过程。Driver节点通常运行在集群管理系统之外的独立节点上,用于启动Spark应用程序,并将任务分发给Executor节点执行。
2. Executor节点
Executor节点是Spark集群中的工作节点,用于执行Spark应用程序的具体计算任务。每个Executor节点通常都运行在独立的进程中,可以并行地处理多个任务,并将计算结果返回给Driver节点。
3. Cluster Manager
Cluster Manager是Spark集群的管理组件,用于管理和协调整个集群的资源分配和任务调度。常见的Cluster Manager包括Standalone、YARN、Mesos等。
4. Spark应用程序
Spark应用程序是用户编写的具体业务逻辑代码,它可以被打包成Jar包或直接提交到Spark集群运行。Spark应用程序通常包括如下几个部分:
(1)建立SparkContext对象:SparkContext是Spark应用程序的核心组件,用于连接Spark集群并管理计算任务的执行过程。
(2)加载数据:Spark应用程序通常需要从不同的数据源(如HDFS、关系型数据库等)中加载数据,并将其转换为RDD(弹性分布式数据集)。
(3)数据转换和计算:Spark应用程序通过一系列的数据转换和算子操作,对RDD进行处理并得到最终结果。
(4)持久化和输出:Spark应用程序可能需要将运算结果输出到外部存储系统(如HDFS、关系型数据库等)或持久化到内存中以供后续使用。
总的来说,Spark集群是一个由多个节点组成的集群,其中包括Driver节点、Executor节点、Cluster Manager以及Spark应用程序等组件,它可以高效地处理大数据量的计算任务,并为企业提供更好的数据分析和处理支持。
2.2 远程连接手段
本次实验使用远程连接工具对ECS主机进行访问与操作。使用的工具包括PuTTY、Xshell等,均用到了SSH协议:如下图所示。SSH是一种加密的网络传输协议,可在不安全的网络中网络服务提供安全的传输环境,通过在网络中创建安全隧道来实现 SSH 客户端和服务器之间的连接。
2.3 实验参数设置
2.3.1 CPU/GPU型号与参数
表2-1 CPU参数
属性 | 型号 |
核心数
|
架构 | 规格名称 | BogoMIPS |
---|---|---|---|---|---|
参数
|
Huawei Kunpeng 920 2.6GHz | 8 | 鲲鹏计算 | kc1.2xlarge.4 | 200.00 |
表2-2 GPU参数
属性 | 设备编码 | 规格 |
---|---|---|
参数 | 02:03.0 | Virtio GPU(rev 01) |
2.3.2 内存容量与带宽
表2-3 主存参数
属性 | 内存容量 | 基准/最大带宽 | 可用空间 |
---|---|---|---|
参数 | 32 GiB | 3.0/7 Gb/s | 19602599 kB |
2.3.3 网络参数
Node-0001:
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
inet 192.168.0.223 netmask 255.255.255.0 broadcast 192.168.0.255
inet6 fe80::f816:3eff:fe47:9f7e prefixlen 64 scopeid 0x20<link>
ether fa:16:3e:47:9f:7e txqueuelen 1000 (Ethernet)
RX packets 748627 bytes 546480634 (521.1 MiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 607953 bytes 2637082994 (2.4 GiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536
inet 127.0.0.1 netmask 255.0.0.0
inet6 ::1 prefixlen 128 scopeid 0x10<host>
loop txqueuelen 1000 (Local Loopback)
RX packets 3299 bytes 336131 (328.2 KiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 3299 bytes 336131 (328.2 KiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
Node-0002:
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
inet 192.168.0.204 netmask 255.255.255.0 broadcast 192.168.0.255
inet6 fe80::f816:3eff:fe47:9f6b prefixlen 64 scopeid 0x20<link>
ether fa:16:3e:47:9f:6b txqueuelen 1000 (Ethernet)
RX packets 392745 bytes 1271822978 (1.1 GiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 245151 bytes 18219175 (17.3 MiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536
inet 127.0.0.1 netmask 255.0.0.0
inet6 ::1 prefixlen 128 scopeid 0x10<host>
loop txqueuelen 1000 (Local Loopback)
RX packets 73 bytes 11086 (10.8 KiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 73 bytes 11086 (10.8 KiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
Node-0003:
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
inet 192.168.0.189 netmask 255.255.255.0 broadcast 192.168.0.255
inet6 fe80::f816:3eff:fe47:9f5c prefixlen 64 scopeid 0x20<link>
ether fa:16:3e:47:9f:5c txqueuelen 1000 (Ethernet)
RX packets 943068 bytes 1306985143 (1.2 GiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 251634 bytes 18607232 (17.7 MiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536
inet 127.0.0.1 netmask 255.0.0.0
inet6 ::1 prefixlen 128 scopeid 0x10<host>
loop txqueuelen 1000 (Local Loopback)
RX packets 64 bytes 10386 (10.1 KiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 64 bytes 10386 (10.1 KiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
Node-0004:
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
inet 192.168.0.143 netmask 255.255.255.0 broadcast 192.168.0.255
inet6 fe80::f816:3eff:fe47:9f2e prefixlen 64 scopeid 0x20<link>
ether fa:16:3e:47:9f:2e txqueuelen 1000 (Ethernet)
RX packets 907974 bytes 1254824909 (1.1 GiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 249871 bytes 18673933 (17.8 MiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536
inet 127.0.0.1 netmask 255.0.0.0
inet6 ::1 prefixlen 128 scopeid 0x10<host>
loop txqueuelen 1000 (Local Loopback)
RX packets 65 bytes 10626 (10.3 KiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 65 bytes 10626 (10.3 KiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
三、实验步骤与结果
本章介绍实验的步骤和逐步输出的截图。一共包括了两个子实验:Spark集群搭建与存算分离和基于Spark的影评数据分析算法的实现。
3.1 Spark集群存算分离
本部分实验的整体流程图如下所示:
3.1.1 搭建Spark集群
使用老师提供的安装包spark-2.3.0-bin-without-hadoop.tgz,通过 软件远程传输到ECS主机上;
node-0001节点执行cd、cp等命令,复制安装包到/home/modules目录下,然后解压安装包,如下图所示:
- 在node-0001节点,复制jar包到spark/jar下。然后配置spark配置文件,在spark-env.sh文件末尾添加如下内容:
- 分发Spark,node-0001节点执行scp命令,分发Spark文件到其他3个节点,如下图所示,各个节点均分发完成:
- 对每个节点配置profile,然后source执行。
3.2.1 验证存算分离
- 启动yarn服务,如图所示:可见yarn启动成功。
- 使用之前上传到OBS上的player.txt文件,计算该文件的WordCount。首先启动pyspark,如下图所示说明启动成功:
然后,我们首先输入如下代码,输出如下内容可以看到spark的进程情况:
- 输入WORDCOUNT代码,执行成功输出如下内容。经过和OBS上的文件的对比每个word个数统计正确。说明存算分离成功。
3.2 Spark豆瓣影评分析
本补充部分使用给定的txt文件,基于Spark统计出如下内容:
- 评分最高的前10位电影;
- 每部电影的平均评论人数;
- (可选)每个评分段的电影数。
3.2.1 实验题目分析
本实验给出了一个41K条数据的txt文档,里面的条目包含以下的信息:
表3-1 数据文件解析
属性 | id | name | actor | grade | num |
---|---|---|---|---|---|
含义 | 序号 | 电影名字 | 演员 | 评分 | 评论人数 |
因此,可以使用PySpark的DataFrame数据结构来实现统计。PySpark集成了DataFrame的许多实用功能,可以用来分析数据。
本补充实验的整体流程图如下所示:
3.2.2 数据清洗
需要注意的是,该文件是以csv的形式给出的,所以如果actor有多位且也是使用逗号分隔开的,那么解析时就会出错。对于本次实验,由于不需要用到actor的信息,故将其剔除即可。数据清洗的代码如下所示;
with open('movie.txt', 'r',encoding='utf8') as f: # 读取原始文件的每一行
lines = f.readlines()
new_lines = [] # 新行数组
for line in lines: # 对于每一行,识别半角逗号作为分隔符
row = line.strip().split(',')
new_row = [row[1], row[-2], row[-1]] # 取第2列、倒数第1、2列
new_line = ','.join(new_row)
new_lines.append(new_line+'\n') # 使用,分割,然后成为新的line
with open('new_movies.txt', 'w',encoding='utf8') as f:
f.writelines(new_lines) # 新建文件存储筛选出的列
该程序的功能在于,只筛选出数据文件中的name、grade和num属性,存储到一个清洗文件new_movies.txt
中,也就是筛选出第2列、倒数第1、2列,之后将该文件上传至OBS列表中,使部署在ECS上的HDFS可以访问到即可。
3.2.3 数据读取
首先需要创建一个SparkSession,然后读入文件,将csv转换为数据帧格式,如下所示:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('readfile').getOrCreate() # 创建Spark任务,也可以不写
# 读入文件,将csv转换为数据帧格式
logFilePath = 'obs://bigdatapro-obs/'
df = spark.read.csv(logFilePath,encoding='utf-8',header=True, inferSchema=True,sep=',')
3.2.4 题目一:评分最高的前10位电影
实现代码如下所示:
df1 = df.orderBy("grade",ascending=0) #根据评分字段倒序排序
df2 = df1.select('name','grade') #筛选出name属性
df2.show(10) #只输出前10位
首先使用Spark数据帧的orderBy方法,根据评分字段倒序排序;
然后输出前十位的电影名称即可。
实验输出如下:
3.2.5 题目二:电影评论的平均值
实现代码如下所示:
import pyspark.sql.functions as f
import numpy as np
# 筛选num字段,清洗多余文字
df3=df.select("num", f.regexp_replace(f.col("num"), "人评价", "").alias("commentsNum"))
# describe方法获取平均值
df4=df3.select("commentsNum").describe()
# 筛选出表示平均值的那一行
df5=df4.where(df4["summary"]=="mean")
df5.show()
设计的大致步骤为:
首先筛选num字段,复制出一个commentsNum后续计算;
之后使用describe方法对commentsNum进行多值多方法统计;
最后筛选出表示平均数的那一行进行输出即可。
PS. describe方法可以输出除了平均数以外的多个统计量,包括某一字段的中位数、方差等。
实验输出如下所示:
3.2.6 题目三:评分段电影数目统计
实现代码如下图所示:
allcnt=0 # 用于统计是否正确的标记量
for i in range (0,20): # 筛选和统计
cnt=df.select("name","grade").where((df.grade>0.5*i) & (df.grade<=0.5*(i+1))).count()
print("分数位于(",0.5*i,", ",0.5*i+0.5,"]的数量为: ",cnt,sep="")
allcnt+=cnt
对于0-10的每个分数段,首先筛选出name和评分两个字段,然后使用count方法统计符合分数段区间条件的数据帧数目。需要注意的是,游标 的遍历区间为 ,也就是说对于每个 而言,评分区间左界为 ,右界则为 。
实验输出如下:
到此补充题目的实现完成。
四、实验拓展和展望
本章包括了对于实验所用到知识点的拓展分析,包括ECS服务器、WordCount实现方法拓展与性能对比、Spark和Hadoop的对比分析以及实验心得与体会。具体介绍如下。
4.1 对ECS云服务器的理解
4.1.1 ECS云服务器概述
弹性云服务器Elastic Compute Service(ECS)是华为云等云计算厂商提供的一种基础云计算服务。可以通过远程在高性能主机上控制部署自己的项目。云服务器相对传统的服务器而言,在部署项目时无需提前采购硬件设备,而是根据业务需要,可以随时扩容磁盘、增加带宽。
4.1.2 ECS云服务器的相关知识
下面列出了笔者自行归纳的有关ECS的知识点:
1. 服务模型:IaaS
基础设施即服务。指把IT基础设施作为一种服务通过网络对外提供,并根据用户对资源的实际使用量或占用量进行计费的一种服务模式。ECS提供的主要就是可以自行扩展的服务器,软硬件都需要自行设置。
2. 产品形态:虚拟机、裸金属、DDH等
华为云的裸金属业务单独为BMS,安全性更高。
3. 产品组件:实例、镜像、块存储、快照等
本实验使用的ecs-docker即为云主机实例,创建的docker image即为镜像。
4. 计算架构:x86、ARM等
华为云在此基础上扩展了鲲鹏等业务。
5. 实例类型:通用计算、异构计算、高性能计算等
本实验使用的为鲲鹏通用计算增强型实例。
6. 部署模型:私有云等
本实验的vpc-docker属于私有云部署模型,当然ECS也支持其他部署方式
4.2 WordCount知识点分析与性能对比
本实验中,Spark存算分离使用到了单词计数算法,也就是著名的WordCount来验证PySpark的功能。实际上,Hadoop和Spark均可用于实现这一算法。
4.2.1 基于MapReduce的实现方法
1. Map函数
将源文件中的单词记录信息作为map函数中的(key, value)对,具体而言map的输入是(文件名,文件内容)。map函数将拆分value里的单词并生成中间结果(即<word, 1>),并且为每个输入配对一个output key进行输出。
函数的伪代码如下:
Function Map(input_key, input_value) :
BEGIN
input_key=文件名
input_value=文件内容
for each word w in input_value do
EmitIntermediate (w, "1")
endfor
END
2. reduce函数
Map操作结束后,调用reduce作结果规约操作。将所有和每个output_key配对的中间结果排列成一个表,然后将它们组合成对应于这一output_key的最终结果。
函数的伪代码如下:
Function Reduce(output_key, mid_values) : BEGIN
output_key=指定单词w
mid_values=中间结果列表list
res=0
for each value v in mid_values do
res+=parseInt(v)
endfor
Emit (Turn_to_Str(res))
END
4.2.2 基于Spark的实现方法
本实验采取的就是基于Spark的实现方法,代码已经在实验指导书里给出,因此下面给出算法的流程图分析:
4.2.3 方法对比
通过算法分析与资料查找,可以得出Spark算法在word数巨大时,其效率高于Hadoop很多。具体可以参考下图所示:(图像来源:https://blog.csdn.net/tmac1027/)
可以看出,在数据量很小的时候,Hadoop效率优于Spark。但随着数据集的增大,Spark的算法效率逐渐高于Hadoop,且二者之间的差距逐渐明显。
4.3 Spark和Hadoop的对比
Spark和Hadoop都是分布式计算框架,它们之间有很多相似之处,但也有不同之处。下面是Spark相比于Hadoop的优缺点分析:
4.3.1 优点
(1)速度更快:
相比于Hadoop,Spark采用了基于内存的数据处理方式,可以将数据缓存到内存中并实现流式计算,在一些迭代计算任务中表现更加出色。
(2)更强大的计算引擎:
Spark集成了多种计算引擎(如SQL、MLlib、GraphX等),支持更广泛的应用场景,能够处理更复杂的数据计算任务。
(3)易于开发:
Spark提供了丰富的API和工具支持,包括Scala、Java、Python和R等编程语言,为开发人员提供了更便捷的开发体验。
(4)更好的数据处理性能:
Spark的计算模型和RDD数据结构使得它能够高效地执行各种数据处理操作,从而提高了整体的数据处理性能。
4.3.2 缺点
(1)内存消耗较大:
由于Spark在计算过程中需要缓存大量的数据,因此对于内存资源的消耗比较大,需要有足够的内存容量来支撑其运行。
(2)适用场景有限:
Spark的分布式计算模型适用于大规模数据处理,并且需要频繁地对数据进行计算操作,如机器学习、图计算等领域。对于Hadoop所擅长的批处理等场景,Spark可能并不是最优选择。
(3)对环境要求较高:
Spark对运行环境的要求较高,需要在高性能、高可靠的服务器集群上运行,而这种环境的建设和维护成本也比较高。
总的来说,Spark相比于Hadoop具有更快的处理速度、更强大的计算引擎和更便捷的开发体验等优点,但同时也存在内存消耗较大和适用场景有限等缺点,因此在实际应用中,需要根据具体业务需求来选择和应用不同的技术方案。
4.3.3 Spark为什么比Hadoop快
(1)消除了冗余的HDFS读写:
Hadoop每次shuffle后,必须进行磁盘的读写操作,而Spark可以缓存到内存中以便下一次迭代时使用。因此,随着迭代次数的增加,磁盘读写和内存读写之间的性能差距就会越来越明显。
(2)消除了冗余的Map Reduce阶段
Hadoop的shuffle算法执行的MapReduce一定是完整的,意味着会存在相当的冗余步骤。而Spark基于RDD既可以灵活使用各种算子,又能使用缓存存储reduce的数据。
(3)JVM优化性能
每次进行MapReduce操作时,Hadoop均会启动一个task,从而启动一个JVM基于进程的操作。而Spark的相同操作则是基于线程和内存的,只在启动Excutor之时启动一次JVM。至于task,Spark可将其在线程当中进行重用,没有冗余的进程开销,降低了操作的复杂程度。
- 点赞
- 收藏
- 关注作者
评论(0)