【Spark】如何在Spark Scala/Java应用中调用Python脚本

小兔子615 发表于 2021/10/30 15:46:47 2021/10/30
【摘要】 本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同 1.PythonRunner对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只需要调用PythonRunner 的main方法,就可以在Scala或Java程序中调用Python脚本。在实现上,PythonRun...

本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同

1.PythonRunner

对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只需要调用PythonRunner 的main方法,就可以在Scala或Java程序中调用Python脚本。在实现上,PythonRunner 基于py4j ,通过构造GatewayServer实例让python程序通过本地网络socket来与JVM通信。

// Launch a Py4J gateway server for the process to connect to; this will let it see our
    // Java system properties and such
    val localhost = InetAddress.getLoopbackAddress()
    val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()
      .authToken(secret)
      .javaPort(0)
      .javaAddress(localhost)
      .callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
      .build()
    val thread = new Thread(new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions {
        gatewayServer.start()
      }
    })
    thread.setName("py4j-gateway-init")
    thread.setDaemon(true)
    thread.start()
        
    // Wait until the gateway server has started, so that we know which port is it bound to.
    // `gatewayServer.start()` will start a new thread and run the server code there, after
    // initializing the socket, so the thread started above will end as soon as the server is
    // ready to serve connections.
    thread.join()

在启动GatewayServer后,再通过ProcessBuilder构造子进程执行Python脚本,等待Python脚本执行完成后,根据exitCode判断是否执行成功,若执行失败则抛出异常,最后关闭gatewayServer。

    // Launch Python process
    val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
    try {
      val process = builder.start()
 
      new RedirectThread(process.getInputStream, System.out, "redirect output").start()
 
      val exitCode = process.waitFor()
      if (exitCode != 0) {
        throw new SparkUserAppException(exitCode)
      }
    } finally {
      gatewayServer.shutdown()
    }

2.调用方法

2、1 调用代码

PythonRunner的main方法中需要传入三个参数:

  • pythonFile:执行的python脚本
  • pyFiles:需要添加到PYTHONPATH的其他python脚本
  • otherArgs:传入python脚本的参数数组
    val pythonFile = args(0)
    val pyFiles = args(1)
    val otherArgs = args.slice(2, args.length)

具体样例代码如下,scala样例代码:

package com.huawei.bigdata.spark.examples
 
import org.apache.spark.deploy.PythonRunner
import org.apache.spark.sql.SparkSession
 
object RunPythonExample {
  def main(args: Array[String]) {
    val pyFilePath = args(0)
    val pyFiles = args(1)
    val spark = SparkSession
      .builder()
      .appName("RunPythonExample")
      .getOrCreate()
 
    runPython(pyFilePath, pyFiles)
 
    spark.stop()
  }
 
  def runPython(pyFilePath: String, pyFiles :String) : Unit = {
    val inputPath = "-i /input"
    val outputPath = "-o /output"
    PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))
  }
}

python样例代码:

#!/usr/bin/env python
# coding: utf-8
import sys
import argparse
 
argparser = argparse.ArgumentParser(description="ParserMainEntrance")
argparser.add_argument('--input', '-i', help="input path", default=list(), required=True)
argparser.add_argument('--output', '-o', help="output path", default=list(), required=True)
arglist = argparser.parse_args()
 
def getTargetPath(input_path, output_path):
    try:
        print("input path: {}".format(input_path))
        print("output path: {}".format(output_path))
        return True
    except Exception as ex:
        print("error with: {}".format(ex))
        return False
 
if __name__ == "__main__":
    ret = getTargetPath(arglist.input, arglist.output)
    if ret:
        sys.exit(0)
    else:
        sys.exit(1)

2、2 运行命令

​ 执行python脚本需要设置pythonExec,即执行python脚本所使用的执行环境。默认情况下,使用的执行器为python(Spark 2.4 及以下)或 python3 (Spark 3.0 及以上)。

    //Spark 2.4.5
    val sparkConf = new SparkConf()
    val secret = Utils.createSecret(sparkConf)
    val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
      .orElse(sparkConf.get(PYSPARK_PYTHON))
      .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
      .orElse(sys.env.get("PYSPARK_PYTHON"))
      .getOrElse("python")
 
	//Spark 3.1.1
    val sparkConf = new SparkConf()
    val secret = Utils.createSecret(sparkConf)
    val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
      .orElse(sparkConf.get(PYSPARK_PYTHON))
      .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
      .orElse(sys.env.get("PYSPARK_PYTHON"))
      .getOrElse("python3")

如果要手动指定pythonExec,需要在执行前设置环境变量(无法通过spark-defaults传入)。在cluster模式下,可以通过 --conf “spark.executorEnv.PYSPARK_PYTHON=python3” --conf “spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3” 设置。driver端还可以通过export PYSPARK_PYTHON=python3 设置环境变量。
​ 若需要上传pyhton包,可以通过 --archive python.tar.gz 的方式上传。

​ 为了使应用能够获取到py脚本文件,还需要在启动命令中添加 --file pythonFile.py 将python脚本上传到 yarn 上。

​ 运行命令参考如下:

spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py

如果需要使用其他python环境,而非节点上已安装的,可以通过 --archives 上传python压缩包,再通过环境变量指定pythonExec,例如:

spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。