基于华为云平台搭建Spark集群并进行影评数据分析
本文章是对之前云计算的Spark实验的一个记录与总结,同时也包括了自己在实验过程中的一些感想。
本次实验为云计算的Spark实验,实验在华为云平台上进行。实验进行过程中,首先在华为云平台上购买了4台服务器组成的云服务器集群和OBS服务;然后,使用PuTTY分别登陆各个主机并且配置网络和ssh免密登录,对Hadoop集群进行调配;随后,配置OBS服务并且测试OBS服务与Hadoop集群的连接。随后,在搭建好的Hadoop集群上配置Spark,使用pyspark验证存算分离。最后,本次实验进行了一个实际案例分析,即对于给出的4MB大小的豆瓣影片评分数据进行在华为云平台上进行数据分析,完成对影评数据的读取、清洗和分析等操作。
实验环境
本次实验在华为云平台上进行,购买了4台云服务器以搭建Hadoop集群。服务器的相关配置如下:
规格 | 鲲鹏通用计算增强型 | kc1.2xlarge.4 | 8vCPUs | 32GiB |
---|---|
镜像 | CentOS 7.6 64bit with ARM | 公共镜像 |
云硬盘 | 超高IO | 200 GiB |
此外,本次实验还使用了OBS作为文件存储服务为Hadoop集群提供存储支持。其相关配置如下:
存储类别 | 标准存储 |
---|---|
数据冗余策略 | 多AZ存储 |
实验原理
本实验通过购买华为云OBS和华为云ECS服务,提供后续集群的搭建基础。本实验的基本步骤包含:购买并配置ECS;购买OBS并获取AK、SK信息;随后,在安装完毕并配置好Hadoop的基础上,安装Spark集群,并使Spark能够读取OBS数据,通过该实验使Spark集群能够实现存算分离,提高计算性能。
实验步骤
准备华为云环境
购买华为云ECS
浏览器登录华为云。进入控制台,选择弹性云服务器ECS,点击“控制台”。点击框线处,展开“服务列表”,选择“弹性云服务器ECS”,选择购买弹性云服务器ECS。
按照下图进行基础配置。
按照下图进行网络配置。
按照下图进行高级配置。
随后检查配置,勾选我已阅读并同意免责声明,立即购买。
完成购买后,查看已购买的所有ECS。
购买OBS
在“服务列表”选取“对象存储服务OBS”。进入OBS服务后,点击右侧“创建桶”。参照下图购买OBS。
创建并行文件系统。选择“并行文件系统”,点击右上角“创建并行文件系统”,其他配置默认,保存创建即可。
进入创建的OBS桶,复制Endpoint参数,保存到本地文档。
获取AK/SK,点击华为云页面右上角“用户名”,下拉选择“我的凭证”,点击“访问秘钥”。点击新增访问秘钥,根据提示进行操作。待操作完成后,得到文件“credentials.csv”,打开即可得到AK/SK。
搭建Hadoop集群
Hadoop集群搭建
配置ECS
使用putty或mobaxterm登录ECS。
下载安装包。
下载hadoop安装包、OBSFileSystem相关jar包和Openjdk。由于从外网下载速度较慢,为保证能在规定时限内完成实验,因此这里我将课程资源内的实验文件的相关安装包通过scp命令从本机传输至云主机。经过md5校验,确认实验文件中的安装包和官网的安装包是一致的。
配置/etc/hosts文件。查看ECS列表各节点IP。
各节点执行vim /etc/hosts
命令,添加以下内容,保存退出。(在实验过程中,node-0001出现未知问题,无法免密登录其他节点。在重置系统之后问题依然没有解决,为了确保在代金券有效范围内能够及时完成实验,因此之后的实验只使用node-0002~node-0004三个节点进行集群搭建;其中重新约定node-0002为主节点)
配置节点互信。各节点执行以下命令,连续回车生成/root/.ssh/id_rsa.pub文件;
各节点执行cat /root/.ssh/id_rsa.pub
,输出如下内容。复制该命令在各节点的输出内容。
各节点执行vim /root/.ssh/authorized_keys
命令,输入各节点的复制内容,保存退出。
各节点执行:ssh node-0001~node-0004,选择yes后,确保能够无密码跳转到目标节点。
确定JDK是否安装
确认是否已安装JDK。各节点执行命令,输出如下结果:
搭建Hadoop集群
搭建Hadoop集群
创建目录。各节点执行:
$ mkdir -p /home/modules/data/buf
$ mkdir -p /home/nm/localdir
登录node-0002节点,解压hadoop安装包
配置hadoop core-site.xml配置文件。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/core-site.xml
。参数配置如下:
配置hdfs-site.xml。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/hdfs-site.xml
。参数配置如下:
配置yarn-site.xml。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/yarn-site.xml
。参数配置如下:
配置mapred-site.xml。node-0002节点执行下列命令:
$ cd /home/modules/hadoop-2.0.3/etc/hadoop
$ mv mapred-site.xml.template mapred-site.xml
参数配置如下:
配置slaves。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/slaves
,删除原有内容,添加内容如下:
配置hadoop环境变量。node-0002节点执行下列命令:vim /home/modules/hadoop-2.8.3/etc/hadoop/hadoop-env.sh
。添加如下内容:
配置jar包。node-0002节点执行下列命令:
$ cd /root
$ cp hadoop-huaweicloud-2.0.3-hw-39.jar /home/modules/hadoop-2.0.3/share/hadoop/common/lib/
$ cp hadoop-huaweicloud-2.0.3-hw-39.jar /home/modules/hadoop-2.0.3/share/hadoop/tools/lib
$ cp hadoop-huaweicloud-2.0.3-hw-39.jar /home/modules/hadoop-2.0.3/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/
$ cp hadoop-huaweicloud-2.0.3-hw-39.jar /home/modules/hadoop-2.0.3/share/hadoop/hdfs/lib/
分发hadoop包到各节点。node-0002下执行下列命令:
配置环境变量。各节点执行vim /etc/profile
,在文件尾部添加如下内容并执行source /etc/profile
:
namenode初始化。node-0002节点执行namenode初始化,执行下列命令:
初始化成功后,启动hdfs。
启动HDFS。
执行hdfs命令。
测试与OBS互联
在OBS上传文件。进入OBS桶,选择“对象”上传文件。可查看到OBS文件上传成功。
执行hdfs命令hdfs dfs -ls obs://bigdatapro-obs-tjubkt/
查看OBS文件。Hadoop集群与OBS互联成功。
Spark集群搭建
Spark集群存算分离
搭建Spark集群
获取spark安装包,node-0002节点下载Spark安装包。由于从外网下载速度较慢,因此这里我将课程资源内的spark实验文件的相关安装包通过scp命令从本机传输至云主机。经过md5校验,确认实验文件中的安装包和官网的安装包是一致的。
解压spark安装包。node-0002节点执行下列命令,复制安装包到/home/modules目录下
$ cp /root/spark-2.3.0-bin-without-hadoop.tgz /home/modules
$ cd /home/modules
解压安装包:
配置spark jar包。在node-0002节点,复制jar包到spark/jar下:
配置spark配置文件,node-0002节点执行下列命令:
文件末尾添加如下内容:
分发Spark。node-0002节点执行下列命令:
配置环境变量。各节点执行vim /etc/profile
,添加如下内容,保存退出。再执行source /etc/profile
,使环境变量生效。
验证存算分离
查看要计算的文件,本次实验验证Spark与OBS实现存算分离,数据使用实验2.3.2中上传的playerinfo.txt文件。
计算上述数据的wordcount。
启动yarn。
$ vim /etc/profile
$ source /etc/profile
$ start-yarn.sh
启动pyspark。
输入下列代码:
lines = spark.read.text("obs://bigdatapro-obs/").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
执行成功输出如下结果:
存算分离计算成功!
影评数据分析
在配置好spark的基础上,使用spark分析数据,计算以下内容:
输出评分最高的前十个电影的电影名
输出平均每部电影有多少人进行评价
输出每个分段的电影各有多少部(例:9.5~10.0: 7部,8.5~9.0: 10部,仅举例)。程序代码如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, col, avg, floor
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
appname = 'douban-movie'
master = 'local[*]'
path = 'movie.csv'
if __name__ == '__main__':
# 获取SparkSession,以便在后续进行操作
spark = SparkSession.builder \
.master(master) \
.appName(appname) \
.getOrCreate()
sc = spark.sparkContext
# 定义movie.csv的文件结构
movieSchema = StructType([
StructField('id', IntegerType(), False), # id 字段为整数,不可为空
StructField('name', StringType(), False), # name 影片名 字段为字符串,不可为空
StructField('director', StringType(), False), # director 导演 字段为字符串,不可为空
StructField('grade', DoubleType(), False), # grade 评分 字段为实数,不可为空
StructField('num', IntegerType(), False) # num 评分人数 字段为整数,不可为空
])
# 从csv文件中读取数据
# 读取文件时第一行作为表头
# 对于不符合文件结构schema的记录,直接丢弃
df = spark.read.format('csv') \
.option('header', True) \
.option('mode', 'dropMalformed') \
.schema(movieSchema) \
.load('movie.csv')
# 进一步进行预处理。根据爬取数据,评分grade应该在0.0至10.0之间,评分人数num应该为正数
df = df.where(col('grade') > 0.0).where(col('grade') < 10.0).where(col('num') > 0)
# 打印经过预处理之后剩余的总记录数
print(df.count())
# 解决问题1: 输出评分最高的前十个电影的电影名
# 将电影按照grade降序排序,对于同评分电影,按照name升序排序
# 选择name列,打印10行
df.orderBy(desc('grade'), col('name')) \
.select(col('name')) \
.show(10, truncate=False)
# 解决问题2:输出平均每部电影有多少人评价
# 对num列求平均,记为`mean_num`列
df.select(avg('num').alias('mean_num')) \
.show()
# 解决问题3:输出各个分段的电影有多少部
# 首先计算grade_group, 即每个电影按照评分所落入的组号. 0.1~0.5落入组0, 0.6~1.0落入组1, 以此类推。
# 按照grade_group进行分组,在组内计算该组有多少部电影
# 计算每个组评分的下限lower_bound和评分的上限upper_bound, 以及组内的电影数count
# 最后按照评分从低到高进行排序即可
df.withColumn('grade_group', floor((col('grade') - 0.1) / 0.5)) \
.groupBy(col('grade_group')).count() \
.select((col('grade_group')*0.5+0.1).alias('lower_bound'), (col('grade_group')*0.5+0.5).alias('upper_bound'), 'count') \
.orderBy('lower_bound') \
.show()
首先将提供的movie.txt转换为movie.csv格式,然后进行初步的预处理。在原数据中,存在一些不合法的数据,例如在电影名称中包含有英文逗号以及包括多个导演的数据,这些数据很难通过自动手段进行处理,只能手动划分或者直接去除。因此,在导入DataFrame时,这些数据通过给定schema和dropMalformed策略被过滤掉。随后在下面通过对grade和num的条件筛选过滤掉剩余的不符合规则的数据。
在过滤完成后,剩余的有效数据数为37563条。
题目1的实验结果如下所示。由于存在同分的影片,这里是按照相同评分的情况下按照影片名称升序排列进行的。
题目2的实验结果如下所示。平均每部电影有约7575个人进行评价。
题目3的实验结果如下所示。在这里,首先对每部电影根据其评分分配一个分段(grade_group)(0.1-0.5分落入分段0, 0.6-1.0分落入分段1, 1.1-1.5分落入分段2,以此类推)然后按照分段进行分组,得到每个分段内的影片数。这里可以发现,在提供的数据中,豆瓣没有2分以下的影片;影片的分布呈现钟形分布——5-7分的电影数最多,分数偏高的电影和分数偏低的电影相对都比较少。
释放试验资源
删除ECS弹性云服务器
进入ECS列表,点击全选按钮,点击“更多 -> 删除”。在对话框中选择“释放云服务器绑定的公网IP地址”和“删除云服务器挂载的数据盘”,点击“是”。
删除OBS对象存储
进入OBS服务的并行文件系统列表,点击创建的文件系统。选择左侧“文件”标签,在右侧全选所有文件,点击“删除”并确认。返回并行文件系统列表,在对应的文件系统右侧点击“删除”并确认。
资源检查
在控制台点击“更多 | 资源 | 我的资源”菜单项,检查资源是否全部删除。
总结展望
如今的企业在其业务中也已经普遍地使用Spark进行大数据分析。Twitter使用Spark进行实时数据分析、热搜话题检测,并将其应用于推荐系统和机器学习。Spark帮助Twitter从数以十亿记的推文、用户、话题和事件中抽取高价值的信息并应用于公司未来的业务发展之中;Netflix使用Spark来构建其推荐系统、个性化推送、欺诈行为检测和机器学习任务,Netflix通过此方式对影评和评分进行大规模的分析预测,来帮助其对未来的影视和投资方向进行高效的预测;Intel在其工艺生产线中部署Spark来进行大规模的数据处理,其范围涵盖了调研、开发、测试、生产和市场分析等一系列方面,Intel还使用Spark来分析传感器数据、工作日志、芯片设计、芯片工作情况统计,依据此来改进其未来的研发方向。总的来说,Spark能够快速、可扩展和灵活地处理超大型数据集,用于数据分析、机器学习、流媒体、SQL、图形方面等,在生产实际中起到了极大的作用。
- 点赞
- 收藏
- 关注作者
评论(0)