从python编译到运行pyspark样例
MRS集群默认会带上Python2.7.5和Python3.8.0两个版本的Python。默认使用的是Python2.7.5。但是有时候我们希望使用的是我们指定的Python版本来运行pyspark任务,因此需要自行上传对应的Python版本包。由于Python较依赖环境,不同环境编译出来的Python版本可能并不通用。目前遇到过因为libffi和libffi-devel版本不一致导致pyspark运行的时候报错。因此我们在上传Python版本压缩包之前最好在集群系统相同的Linux机器上编译对应的Python。
本文主要介绍从购买华为云ECS到运行pyspark的wordcount样例的过程。
1 准备环境
测试环境是一套MRS_3.1.0普通集群;
希望使用的Python版本是Python3.6.6;
1.1 购买华为云ECS
MRS使用的ECS系统版本一般都是EulerOS2.2或者EulerOS2.5,因此我们购买ECS的时候可以选择2.2或者2.5的EulerOS系统。
如果这个ECS只用于编译Python,需要的资源并不多,可以选择最低规格的ECS。本次测试使用的规格为:通用计算型-2U4G EulerOS2.2
另外最好给ECS配置上EIP,方便访问公网下载Python源码。
购买ECS具体操作参考:https://support.huaweicloud.com/qs-ecs/zh-cn_topic_0030831985.html
1.2 编译Python
使用以下命令安装三方依赖软件:
yum install zlib zlib-devel zip -y
下载对应Python版本源码:
wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz
解压Python源码:
tar -zxvf Python-3.6.6.tgz
创建安装目录:
mkdir /opt/python36
编译Python:
cd Python-3.6.6
./configure --prefix=/opt/python36
出现以下内容表明上述命令执行成功
make -j8
出现以下内容表明上述命令执行成功
make install
出现以下内容表明上述命令执行成功
至此,Python已经安装完成。
1.3 安装任务依赖模块
系统默认已经带上2.7版本Python,需要修改环境变量:
export PYTHON_HOME=/opt/python36
export PATH=$PYTHON_HOME/bin:$PATH
安装三方模块:
pip3 install helloword
本地测试是否安装成功:
使用python3进入python交互界面,执行以下代码:
import helloworld
helloworld.say_hello("test")
如果出现以下内容则说明安装成功:
1.4 打包Python.zip
cd /opt/python36/
zip -r python36.zip ./*
将压缩包发送到需要使用的MRS客户端节点上,我们以客户端节点的/opt目录为存放位置。
2 测试运行
解压文件,配置环境变量:
cd /opt
unzip python36.zip -d python36
export PYSPARK_PYTHON=/opt/python36/bin/python3
上传压缩包到HDFS上
hdfs dfs -mkdir /user/python
hdfs dfs -put python36.zip /user/python
2.1 本地运行pyspark
使用pyspark启动local模式的交互界面,执行以下代码测试三方包是否生效:
>>> import helloworld
Hello, Sara!
>>> helloworld.say_hello("test")
'Hello, test!'
测试执行sql是否正常:
spark.sql("show tables").show()
spark.sql("select count(*) from test1").show()
2.2 pyspark on yarn client模式
cd /opt
pyspark --master yarn --deploy-mode client \
--conf spark.pyspark.python=./python36.zip/bin/python3 \
--conf spark.pyspark.driver.python=/opt/python36/bin/python3 \
--conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip
因为client模式下driver是在客户端侧运行,因此需要对driver的python环境单独指定:
spark.pyspark.driver.python=/opt/python36/bin/python3
同样使用上一步的代码测试功能是否正常
增加测试executor是否拿到三方模块检查(/tmp/log1.txt是一个存放在hdfs上面的文本文件,内容不限定):
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
inputPath = "/tmp/log1.txt"
lines = sc.textFile(name = inputPath)
words = lines.flatMap(lambda line:line.split(" "),True)
pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True)
result = pairWords.reduceByKey(lambda v1,v2:v1+v2)
result.foreach(lambda t :print(t))
可以看到executor日志里面打印了相关信息:
将最终结果打印到控制台:
output=result.collect()
print(output)
for (word, count) in output:
print(word,count)
2.3 spark-submit on yarn client模式
将上面的测试命令写到test.py脚本里面:
# -*- coding: utf-8 -*
import helloworld
from pyspark import SparkConf, SparkContext
if __name__ == "__main__":
helloworld.say_hello("test")
#创建SparkConf
conf = SparkConf().setAppName("wordcount")
#创建SparkContext 注意参数要传递conf=conf
sc = SparkContext(conf=conf)
inputPath = "/tmp/log1.txt"
lines = sc.textFile(name = inputPath)
#每一行数据按照空格拆分 得到一个个单词
words = lines.flatMap(lambda line:line.split(" "),True)
#将每个单词 组装成一个tuple 计数1
pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True)
#reduceByKey进行汇总
result = pairWords.reduceByKey(lambda v1,v2:v1+v2)
#executor上打印结果
result.foreach(lambda t :print(t))
#搜集所有结果
output = result.collect()
#打印汇总结果
print(output)
#分开打印结果
for (word, count) in output:
print(word,count)
#退出任务
sc.stop()
启动命令:
spark-submit --master yarn --deploy-mode client \
--conf spark.pyspark.python=./Python/bin/python3 \
--conf spark.pyspark.driver.python=/opt/python36/bin/python3 \
--conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \
test.py
查看executor日志和控制台打印内容,确认结果有被打印。
2.4 spark-submit on yarn cluster模式
依旧使用上面的test.py脚本运行任务。
启动命令:
spark-submit --master yarn --deploy-mode cluster \
--conf spark.pyspark.python=./Python/bin/python3 \
--conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \
test.py
查看executor日志和driver日志打印内容,确认结果有被打印。
3 结论
至此所有操作步骤都执行完成。关键操作就是编译Python和spark on yarn的client与cluster模式下driver的Python环境配置。
参考文档:
- 点赞
- 收藏
- 关注作者
评论(0)