以java API方式提交spark作业
一、文章背景
在初期学习spark的时候是以命令行的方式提交Job到集群环境中运行的,试想当一个作业需要重复去执行的时候且linux脚本不会搞,是不是很尴尬!随着对spark的深入了解和查看官网提供的文档示例,了解到spark提供了以sparkLauncher作为spark job提交的唯一入口,可以用JAVA API编程的方式提交spark job,可以在IDEA中通过创建sparkLauncher对象,进行参数设置后直接点击Run 运行包含Job的Main类就能成功提交job进行运行。还可以集成到spring项目中,避免了以拼接cmd命令的方式集成到项目中带来的安全隐患。
二、实现样例
2.1 主函数样例
public class TestSparkLauncher {
public static void main(String[] args) throws IOException, InterruptedException {
// 用于配置运行spark的环境变量
HashMap env = new HashMap();
env.put("HADOOP_CONF_DIR", "环境上安装的hadoop配置文件目录");
env.put("JAVA_HOME", "环境上的java home");
// 用于指定spark运行时使用的配置文件,默认加载的是环境上安装的spark home下的conf目录
env.put("SPARK_CONF_DIR", "自定义的spark配置文件目录");
SparkLauncher sparkLauncher = new SparkLauncher(env);
sparkLauncher.setAppName("spark job 名称");
sparkLauncher.setAppResource(" spark jar包在hdfs上的路径");
sparkLauncher.setSparkHome("环境上安装的spark路径");
sparkLauncher.setMainClass(" spark jar包的运行主函数名称");
sparkLauncher.setDeployMode("spark 运行模式 client 或 cluster 二选一");
// 提交spark job 获取process
Process process = sparkLauncher.launch();
// client模式下用于输出运行日志
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
// client模式下用于监控spark job 运行结果
int exitCode = process.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
}
2.2 记录日志线程样例
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream_" + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
三、选用这种方式的优劣
优势:通过SparkLanuncher.lanunch()方法获取一个process进程,然后调用进程的process.waitFor()方法等待线程返回结果,获取的输出信息一切都在掌握之中;
劣势:使用这种方式需要自己管理运行过程中的输出信息,比较麻烦。
四、实现过程中遇到的问题
4.1.运行时找不到java_home
在用于配置spark的运行时环境变量的env集合中添加java_home配置或者在sparkLauncher对象内setJavaHome
4.2.开启kerberos认证后,job提交运行失败
在sparkLauncher对象中setConf中以key-value形式配置认证文件及名称
4.3.spark版本兼容较差,
如果日志文件中出现序列化ID不想等的问题,请查看集成的spring项目中的sparkjar包是否与环境安装的spark版本一致。
以上为项目实现过程demo以及部分问题总结,不足之处,请多多指教。
- 点赞
- 收藏
- 关注作者
评论(0)