基于华为云平台搭建Spark集群并进行影评数据分析

举报
yd_249455101 发表于 2023/11/11 23:55:38 2023/11/11
【摘要】 本文章是对之前云计算的Spark实验的一个记录与总结,同时也包括了自己在实验过程中的一些感想。本次实验为云计算的Spark实验,实验在华为云平台上进行。实验进行过程中,首先在华为云平台上购买了4台服务器组成的云服务器集群和OBS服务;然后,使用PuTTY分别登陆各个主机并且配置网络和ssh免密登录,对Hadoop集群进行调配;随后,配置OBS服务并且测试OBS服务与Hadoop集群的连接。随...

本文章是对之前云计算的Spark实验的一个记录与总结,同时也包括了自己在实验过程中的一些感想。

本次实验为云计算的Spark实验,实验在华为云平台上进行。实验进行过程中,首先在华为云平台上购买了4台服务器组成的云服务器集群和OBS服务;然后,使用PuTTY分别登陆各个主机并且配置网络和ssh免密登录,对Hadoop集群进行调配;随后,配置OBS服务并且测试OBS服务与Hadoop集群的连接。随后,在搭建好的Hadoop集群上配置Spark,使用pyspark验证存算分离。最后,本次实验进行了一个实际案例分析,即对于给出的4MB大小的豆瓣影片评分数据进行在华为云平台上进行数据分析,完成对影评数据的读取、清洗和分析等操作。

实验环境

本次实验在华为云平台上进行,购买了4台云服务器以搭建Hadoop集群。服务器的相关配置如下:

规格 鲲鹏通用计算增强型 | kc1.2xlarge.4 | 8vCPUs | 32GiB
镜像 CentOS 7.6 64bit with ARM | 公共镜像
云硬盘 超高IO | 200 GiB

​ 此外,本次实验还使用了OBS作为文件存储服务为Hadoop集群提供存储支持。其相关配置如下:

存储类别 标准存储
数据冗余策略 多AZ存储

实验原理

本实验通过购买华为云OBS和华为云ECS服务,提供后续集群的搭建基础。本实验的基本步骤包含:购买并配置ECS;购买OBS并获取AK、SK信息;随后,在安装完毕并配置好Hadoop的基础上,安装Spark集群,并使Spark能够读取OBS数据,通过该实验使Spark集群能够实现存算分离,提高计算性能。

实验步骤

准备华为云环境

购买华为云ECS

浏览器登录华为云。进入控制台,选择弹性云服务器ECS,点击“控制台”。点击框线处,展开“服务列表”,选择“弹性云服务器ECS”,选择购买弹性云服务器ECS。

按照下图进行基础配置。

按照下图进行网络配置。

按照下图进行高级配置。

随后检查配置,勾选我已阅读并同意免责声明,立即购买。

完成购买后,查看已购买的所有ECS。

购买OBS

在“服务列表”选取“对象存储服务OBS”。进入OBS服务后,点击右侧“创建桶”。参照下图购买OBS。

创建并行文件系统。选择“并行文件系统”,点击右上角“创建并行文件系统”,其他配置默认,保存创建即可。

进入创建的OBS桶,复制Endpoint参数,保存到本地文档。

获取AK/SK,点击华为云页面右上角“用户名”,下拉选择“我的凭证”,点击“访问秘钥”。点击新增访问秘钥,根据提示进行操作。待操作完成后,得到文件“credentials.csv”,打开即可得到AK/SK。

搭建Hadoop集群

Hadoop集群搭建

配置ECS

使用putty或mobaxterm登录ECS。

下载安装包。

下载hadoop安装包、OBSFileSystem相关jar包和Openjdk。由于从外网下载速度较慢,为保证能在规定时限内完成实验,因此这里我将课程资源内的实验文件的相关安装包通过scp命令从本机传输至云主机。经过md5校验,确认实验文件中的安装包和官网的安装包是一致的。

配置/etc/hosts文件。查看ECS列表各节点IP。

各节点执行vim /etc/hosts命令,添加以下内容,保存退出。(在实验过程中,node-0001出现未知问题,无法免密登录其他节点。在重置系统之后问题依然没有解决,为了确保在代金券有效范围内能够及时完成实验,因此之后的实验只使用node-0002~node-0004三个节点进行集群搭建;其中重新约定node-0002为主节点)

配置节点互信。各节点执行以下命令,连续回车生成/root/.ssh/id_rsa.pub文件;

各节点执行cat /root/.ssh/id_rsa.pub,输出如下内容。复制该命令在各节点的输出内容。

各节点执行vim /root/.ssh/authorized_keys命令,输入各节点的复制内容,保存退出。

各节点执行:ssh node-0001~node-0004,选择yes后,确保能够无密码跳转到目标节点。

确定JDK是否安装

确认是否已安装JDK。各节点执行命令,输出如下结果:

搭建Hadoop集群

搭建Hadoop集群

创建目录。各节点执行:

$ mkdir -p /home/modules/data/buf
$ mkdir -p /home/nm/localdir

登录node-0002节点,解压hadoop安装包

配置hadoop core-site.xml配置文件。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/core-site.xml。参数配置如下:

配置hdfs-site.xml。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/hdfs-site.xml。参数配置如下:

配置yarn-site.xml。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/yarn-site.xml。参数配置如下:

配置mapred-site.xml。node-0002节点执行下列命令:

$ cd /home/modules/hadoop-2.0.3/etc/hadoop
$ mv mapred-site.xml.template mapred-site.xml

参数配置如下:

配置slaves。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/slaves,删除原有内容,添加内容如下:

配置hadoop环境变量。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/hadoop-env.sh。添加如下内容:

配置jar包。node-0002节点执行下列命令:

$ cd /root
$ cp hadoop-huaweicloud-2.0.3-hw-39.jar /home/modules/hadoop-2.0.3/share/hadoop/common/lib/
$ cp hadoop-huaweicloud-2.0.3-hw-39.jar /home/modules/hadoop-2.0.3/share/hadoop/tools/lib
$ cp hadoop-huaweicloud-2.0.3-hw-39.jar /home/modules/hadoop-2.0.3/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/
$ cp hadoop-huaweicloud-2.0.3-hw-39.jar /home/modules/hadoop-2.0.3/share/hadoop/hdfs/lib/

分发hadoop包到各节点。node-0002下执行下列命令:

配置环境变量。各节点执行vim /etc/profile,在文件尾部添加如下内容并执行source /etc/profile

namenode初始化。node-0002节点执行namenode初始化,执行下列命令:

初始化成功后,启动hdfs。

启动HDFS。

执行hdfs命令。

测试与OBS互联

在OBS上传文件。进入OBS桶,选择“对象”上传文件。可查看到OBS文件上传成功。

执行hdfs命令hdfs dfs -ls obs://bigdatapro-obs-tjubkt/查看OBS文件。Hadoop集群与OBS互联成功。

Spark集群搭建

Spark集群存算分离

搭建Spark集群

获取spark安装包,node-0002节点下载Spark安装包。由于从外网下载速度较慢,因此这里我将课程资源内的spark实验文件的相关安装包通过scp命令从本机传输至云主机。经过md5校验,确认实验文件中的安装包和官网的安装包是一致的。

解压spark安装包。node-0002节点执行下列命令,复制安装包到/home/modules目录下

$ cp /root/spark-2.3.0-bin-without-hadoop.tgz /home/modules
$ cd /home/modules

解压安装包:

配置spark jar包。在node-0002节点,复制jar包到spark/jar下:

配置spark配置文件,node-0002节点执行下列命令:

文件末尾添加如下内容:

分发Spark。node-0002节点执行下列命令:

配置环境变量。各节点执行vim /etc/profile,添加如下内容,保存退出。再执行source /etc/profile,使环境变量生效。

验证存算分离

查看要计算的文件,本次实验验证Spark与OBS实现存算分离,数据使用实验2.3.2中上传的playerinfo.txt文件。

计算上述数据的wordcount。

启动yarn。

$ vim /etc/profile
$ source /etc/profile
$ start-yarn.sh

启动pyspark。

输入下列代码:

lines = spark.read.text("obs://bigdatapro-obs/").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
output = counts.collect()
for (word, count) in output:
      print("%s: %i" % (word, count))

执行成功输出如下结果:

存算分离计算成功!

影评数据分析

在配置好spark的基础上,使用spark分析数据,计算以下内容:

  1. 输出评分最高的前十个电影的电影名

  2. 输出平均每部电影有多少人进行评价

  3. 输出每个分段的电影各有多少部(例:9.5~10.0: 7部,8.5~9.0: 10部,仅举例)。程序代码如下所示:

from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, col, avg, floor
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

appname = 'douban-movie'
master = 'local[*]'
path = 'movie.csv'

if __name__ == '__main__':
	# 获取SparkSession,以便在后续进行操作
	spark = SparkSession.builder \
	.master(master) \
	.appName(appname) \
	.getOrCreate()
	sc = spark.sparkContext

	# 定义movie.csv的文件结构
	movieSchema = StructType([
		StructField('id', IntegerType(), False),      # id 字段为整数,不可为空
		StructField('name', StringType(), False),     # name 影片名 字段为字符串,不可为空
		StructField('director', StringType(), False), # director 导演 字段为字符串,不可为空
		StructField('grade', DoubleType(), False),    # grade 评分 字段为实数,不可为空
		StructField('num', IntegerType(), False)      # num 评分人数 字段为整数,不可为空
  ]) 

	# 从csv文件中读取数据
	#	读取文件时第一行作为表头
	#	对于不符合文件结构schema的记录,直接丢弃
	df = spark.read.format('csv') \
		.option('header', True) \
		.option('mode', 'dropMalformed') \
		.schema(movieSchema) \
		.load('movie.csv')

    # 进一步进行预处理。根据爬取数据,评分grade应该在0.0至10.0之间,评分人数num应该为正数
	df = df.where(col('grade') > 0.0).where(col('grade') < 10.0).where(col('num') > 0)

    # 打印经过预处理之后剩余的总记录数
	print(df.count())

	# 解决问题1: 输出评分最高的前十个电影的电影名
	#	将电影按照grade降序排序,对于同评分电影,按照name升序排序
	#	选择name列,打印10行
	df.orderBy(desc('grade'), col('name')) \
		.select(col('name')) \
		.show(10, truncate=False)

	# 解决问题2:输出平均每部电影有多少人评价
	#	对num列求平均,记为`mean_num`列
	df.select(avg('num').alias('mean_num')) \
		.show()

	# 解决问题3:输出各个分段的电影有多少部
	#	首先计算grade_group, 即每个电影按照评分所落入的组号. 0.1~0.5落入组0, 0.6~1.0落入组1, 以此类推。
	#	按照grade_group进行分组,在组内计算该组有多少部电影
	#	计算每个组评分的下限lower_bound和评分的上限upper_bound, 以及组内的电影数count
	#	最后按照评分从低到高进行排序即可
	df.withColumn('grade_group', floor((col('grade') - 0.1) / 0.5)) \
		.groupBy(col('grade_group')).count() \
		.select((col('grade_group')*0.5+0.1).alias('lower_bound'), (col('grade_group')*0.5+0.5).alias('upper_bound'), 'count') \
		.orderBy('lower_bound') \
		.show()

首先将提供的movie.txt转换为movie.csv格式,然后进行初步的预处理。在原数据中,存在一些不合法的数据,例如在电影名称中包含有英文逗号以及包括多个导演的数据,这些数据很难通过自动手段进行处理,只能手动划分或者直接去除。因此,在导入DataFrame时,这些数据通过给定schema和dropMalformed策略被过滤掉。随后在下面通过对grade和num的条件筛选过滤掉剩余的不符合规则的数据。

在过滤完成后,剩余的有效数据数为37563条。

题目1的实验结果如下所示。由于存在同分的影片,这里是按照相同评分的情况下按照影片名称升序排列进行的。

题目2的实验结果如下所示。平均每部电影有约7575个人进行评价。

题目3的实验结果如下所示。在这里,首先对每部电影根据其评分分配一个分段(grade_group)(0.1-0.5分落入分段0, 0.6-1.0分落入分段1, 1.1-1.5分落入分段2,以此类推)然后按照分段进行分组,得到每个分段内的影片数。这里可以发现,在提供的数据中,豆瓣没有2分以下的影片;影片的分布呈现钟形分布——5-7分的电影数最多,分数偏高的电影和分数偏低的电影相对都比较少。

释放试验资源

删除ECS弹性云服务器

进入ECS列表,点击全选按钮,点击“更多 -> 删除”。在对话框中选择“释放云服务器绑定的公网IP地址”和“删除云服务器挂载的数据盘”,点击“是”。

删除OBS对象存储

进入OBS服务的并行文件系统列表,点击创建的文件系统。选择左侧“文件”标签,在右侧全选所有文件,点击“删除”并确认。返回并行文件系统列表,在对应的文件系统右侧点击“删除”并确认。

资源检查

在控制台点击“更多 | 资源 | 我的资源”菜单项,检查资源是否全部删除。

总结展望

如今的企业在其业务中也已经普遍地使用Spark进行大数据分析。Twitter使用Spark进行实时数据分析、热搜话题检测,并将其应用于推荐系统和机器学习。Spark帮助Twitter从数以十亿记的推文、用户、话题和事件中抽取高价值的信息并应用于公司未来的业务发展之中;Netflix使用Spark来构建其推荐系统、个性化推送、欺诈行为检测和机器学习任务,Netflix通过此方式对影评和评分进行大规模的分析预测,来帮助其对未来的影视和投资方向进行高效的预测;Intel在其工艺生产线中部署Spark来进行大规模的数据处理,其范围涵盖了调研、开发、测试、生产和市场分析等一系列方面,Intel还使用Spark来分析传感器数据、工作日志、芯片设计、芯片工作情况统计,依据此来改进其未来的研发方向。总的来说,Spark能够快速、可扩展和灵活地处理超大型数据集,用于数据分析、机器学习、流媒体、SQL、图形方面等,在生产实际中起到了极大的作用。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。