以java API方式提交spark作业

举报
baymax_li 发表于 2020/08/19 10:27:26 2020/08/19
【摘要】 spark作为当前主流的计算框架,集成到项目已经越来越普遍,本文介绍的是以SparkLauncher.launch()方式,另一种SparkLauncher.startApplication()不再赘述。

一、文章背景

在初期学习spark的时候是以命令行的方式提交Job到集群环境中运行的,试想当一个作业需要重复去执行的时候且linux脚本不会搞,是不是很尴尬!随着对spark的深入了解和查看官网提供的文档示例,了解到spark提供了以sparkLauncher作为spark job提交的唯一入口,可以用JAVA API编程的方式提交spark job,可以在IDEA中通过创建sparkLauncher对象,进行参数设置后直接点击Run 运行包含JobMain类就能成功提交job进行运行。还可以集成到spring项目中,避免了以拼接cmd命令的方式集成到项目中带来的安全隐患。

二、实现样例

2.1 主函数样例

public class TestSparkLauncher {

     public static void main(String[] args) throws IOException, InterruptedException {

         // 用于配置运行spark的环境变量

         HashMap env = new HashMap();

         env.put("HADOOP_CONF_DIR", "环境上安装的hadoop配置文件目录");

         env.put("JAVA_HOME", "环境上的java home");

         // 用于指定spark运行时使用的配置文件,默认加载的是环境上安装的spark home下的conf目录

         env.put("SPARK_CONF_DIR", "自定义的spark配置文件目录");

         SparkLauncher sparkLauncher = new SparkLauncher(env);

         sparkLauncher.setAppName("spark job 名称");

         sparkLauncher.setAppResource(" spark jar包在hdfs上的路径");

         sparkLauncher.setSparkHome("环境上安装的spark路径");

         sparkLauncher.setMainClass(" spark jar包的运行主函数名称");

         sparkLauncher.setDeployMode("spark 运行模式 client 或 cluster 二选一");

 

         // 提交spark job 获取process

         Process process = sparkLauncher.launch();

         // client模式下用于输出运行日志

         InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");

         Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");

         inputThread.start();

 

         InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");

         Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");

         errorThread.start();

 

         System.out.println("Waiting for finish...");


         // client模式下用于监控spark job 运行结果

         int exitCode = process.waitFor();

         System.out.println("Finished! Exit code:" + exitCode);

 

     }

 

2.2 记录日志线程样例

 

public class InputStreamReaderRunnable implements Runnable {

     private BufferedReader reader;

     private String name;

 

     public InputStreamReaderRunnable(InputStream is, String name) {

         this.reader = new BufferedReader(new InputStreamReader(is));

         this.name = name;

     }

 

     public void run() {

         System.out.println("InputStream_" + name + ":");

         try {

             String line = reader.readLine();

             while (line != null) {

                 System.out.println(line);

                 line = reader.readLine();

             }

             reader.close();

         } catch (IOException e) {

             e.printStackTrace();

         }

     }

 }


三、选用这种方式的优劣

优势:通过SparkLanuncher.lanunch()方法获取一个process进程,然后调用进程的process.waitFor()方法等待线程返回结果,获取的输出信息一切都在掌握之中;

劣势:使用这种方式需要自己管理运行过程中的输出信息,比较麻烦。

四、实现过程中遇到的问题

4.1.运行时找不到java_home

在用于配置spark的运行时环境变量的env集合中添加java_home配置或者在sparkLauncher对象内setJavaHome

 

4.2.开启kerberos认证后,job提交运行失败

sparkLauncher对象中setConf中以key-value形式配置认证文件及名称

4.3.spark版本兼容较差,

如果日志文件中出现序列化ID不想等的问题,请查看集成的spring项目中的sparkjar包是否与环境安装的spark版本一致。

 

以上为项目实现过程demo以及部分问题总结,不足之处,请多多指教。


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。