从python编译到运行pyspark样例

举报
泽及天下不为仁 发表于 2021/09/14 14:27:19 2021/09/14
【摘要】 MRS集群默认会带上Python2.7.5和Python3.8.0两个版本的Python。默认使用的是Python2.7.5。但是有时候我们希望使用的是我们指定的Python版本来运行pyspark任务,因此需要自行上传对应的Python版本包。由于Python较依赖环境,不同环境编译出来的Python版本可能并不通用。

MRS集群默认会带上Python2.7.5和Python3.8.0两个版本的Python。默认使用的是Python2.7.5。但是有时候我们希望使用的是我们指定的Python版本来运行pyspark任务,因此需要自行上传对应的Python版本包。由于Python较依赖环境,不同环境编译出来的Python版本可能并不通用。目前遇到过因为libffi和libffi-devel版本不一致导致pyspark运行的时候报错。因此我们在上传Python版本压缩包之前最好在集群系统相同的Linux机器上编译对应的Python。

本文主要介绍从购买华为云ECS到运行pyspark的wordcount样例的过程。

1      准备环境

测试环境是一套MRS_3.1.0普通集群;

希望使用的Python版本是Python3.6.6;

1.1      购买华为云ECS

MRS使用的ECS系统版本一般都是EulerOS2.2或者EulerOS2.5,因此我们购买ECS的时候可以选择2.2或者2.5的EulerOS系统。

如果这个ECS只用于编译Python,需要的资源并不多,可以选择最低规格的ECS。本次测试使用的规格为:通用计算型-2U4G EulerOS2.2

另外最好给ECS配置上EIP,方便访问公网下载Python源码。

购买ECS具体操作参考:https://support.huaweicloud.com/qs-ecs/zh-cn_topic_0030831985.html

1.2      编译Python

使用以下命令安装三方依赖软件:

yum install zlib zlib-devel zip -y

下载对应Python版本源码:

wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz

解压Python源码:

tar -zxvf Python-3.6.6.tgz

创建安装目录:

mkdir /opt/python36

编译Python

cd Python-3.6.6
./configure --prefix=/opt/python36

出现以下内容表明上述命令执行成功

make -j8

出现以下内容表明上述命令执行成功

make install

出现以下内容表明上述命令执行成功

至此,Python已经安装完成。

1.3      安装任务依赖模块

系统默认已经带上2.7版本Python,需要修改环境变量:

export PYTHON_HOME=/opt/python36
export PATH=$PYTHON_HOME/bin:$PATH

安装三方模块:

pip3 install helloword

本地测试是否安装成功:

使用python3进入python交互界面,执行以下代码:

import helloworld
helloworld.say_hello("test")

如果出现以下内容则说明安装成功:

1.4      打包Python.zip

cd /opt/python36/
zip -r python36.zip ./*

将压缩包发送到需要使用的MRS客户端节点上,我们以客户端节点的/opt目录为存放位置。

2      测试运行

解压文件,配置环境变量:

cd /opt
unzip python36.zip -d python36
export PYSPARK_PYTHON=/opt/python36/bin/python3

上传压缩包到HDFS上

hdfs dfs -mkdir /user/python
hdfs dfs -put python36.zip /user/python

2.1      本地运行pyspark

使用pyspark启动local模式的交互界面,执行以下代码测试三方包是否生效:

>>> import helloworld
Hello, Sara!
>>> helloworld.say_hello("test")
'Hello, test!'

测试执行sql是否正常:

spark.sql("show tables").show()
spark.sql("select count(*) from test1").show()

2.2      pyspark on yarn client模式

cd /opt
pyspark --master yarn --deploy-mode client \
  --conf spark.pyspark.python=./python36.zip/bin/python3 \
  --conf spark.pyspark.driver.python=/opt/python36/bin/python3 \
  --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip

因为client模式下driver是在客户端侧运行,因此需要对driver的python环境单独指定:

spark.pyspark.driver.python=/opt/python36/bin/python3

同样使用上一步的代码测试功能是否正常

增加测试executor是否拿到三方模块检查(/tmp/log1.txt是一个存放在hdfs上面的文本文件,内容不限定):

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
inputPath = "/tmp/log1.txt"
lines = sc.textFile(name = inputPath)
words = lines.flatMap(lambda line:line.split(" "),True)
pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True)
result = pairWords.reduceByKey(lambda v1,v2:v1+v2)
result.foreach(lambda t :print(t))

可以看到executor日志里面打印了相关信息:

将最终结果打印到控制台:

output=result.collect()
print(output)
for (word, count) in output:
  print(word,count)

2.3      spark-submit on yarn client模式

将上面的测试命令写到test.py脚本里面:

# -*- coding: utf-8 -*

import helloworld

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    helloworld.say_hello("test")
    #创建SparkConf
    conf = SparkConf().setAppName("wordcount")
    #创建SparkContext  注意参数要传递conf=conf
    sc = SparkContext(conf=conf)
    inputPath = "/tmp/log1.txt"
    lines = sc.textFile(name = inputPath)
    #每一行数据按照空格拆分  得到一个个单词
    words = lines.flatMap(lambda line:line.split(" "),True)
    #将每个单词 组装成一个tuple 计数1
    pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True)
    #reduceByKey进行汇总
    result = pairWords.reduceByKey(lambda v1,v2:v1+v2)
    #executor上打印结果
    result.foreach(lambda t :print(t))
    #搜集所有结果
    output = result.collect()
    #打印汇总结果
    print(output)
    #分开打印结果
    for (word, count) in output:
        print(word,count)
    #退出任务
    sc.stop()

启动命令:

spark-submit --master yarn --deploy-mode client \
  --conf spark.pyspark.python=./Python/bin/python3 \
  --conf spark.pyspark.driver.python=/opt/python36/bin/python3 \
  --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \
  test.py

查看executor日志和控制台打印内容,确认结果有被打印。

2.4      spark-submit on yarn cluster模式

依旧使用上面的test.py脚本运行任务。

启动命令:

spark-submit --master yarn --deploy-mode cluster \
  --conf spark.pyspark.python=./Python/bin/python3 \
  --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \
  test.py

查看executor日志和driver日志打印内容,确认结果有被打印。

3      结论

至此所有操作步骤都执行完成。关键操作就是编译Python和spark on yarn的client与cluster模式下driver的Python环境配置。

 

 

 

参考文档:

https://bbs.huaweicloud.com/blogs/168935

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。