Oozie源码分析 (二) : Oozie的服务端执行过程
Oozie(驭象者)是Yahoo开发的工作流引擎,主要用于管理Hadoop任务(支持MapReduce、Spark、Pig、Hive),把这些任务以DAG(有向无环图)方式串接起来。
上一篇文章分析了Oozie的客户端启动流程,那么在客户端启动后,服务端是如何执行的呢?本文将继续分析。
由于Oozie是运行在Tomcat中的Servlet应用,通过WEB-INF目录下的web.xml可以看到相关的Listener,涉及的servlet,Filter。
在客户端通过创建Http连接,将请求发送到服务端后,服务端开始做相应的处理如先进行Filter.doFilter()工作
然后调用相应的Servlet的service()以及doPost()或doGet()方法进行进一步地处理。
1、 首先在Server端对收到的http请求进行检查,由于请求是通过浏览器过来的,直接会触发cas的filter校验。
AuthFilter.doFilter() final HttpServletRequest hsreq = (HttpServletRequest)request; if (hsreq.getHeader("User-Agent").startsWith("Mozilla")) { filterChain.doFilter(request, response); return; }
2、组件内部自己的健康检查或者当前集群为非安全集群,均不做安全校验。通过了FilterChain上的所有Filter校验后,开始调用相应的Servlet的service()。
//The health check request from nodeAgent without safety certification if (SERVICE_STATUS_URL.equals(hsreq.getServletPath()) || !ClusterInfoUtils.isSecurityCluster()) { filterChainWrapper.doFilter(request, response); return; }
可以看到当通过shell命令启动一个Oozie作业时,客户端请求的URL的路径是包含/v2/jobs的,因此会找到对应的servlet-name为v1jobs的servlet,最后交由org.apache.oozie.servlet.V1JobsServlet类进行处理。
调用V1JobsServlet.service()方法(该方法定义在V1JobsServlet父类BaseJobsServlet的父类JsonRestServlet中)
,由于该请求的Method为“POST”,则会调用V1JobsServlet.doPost()方法(该方法定义在V1JobsServlet的父类BaseJobsServlet中)
class BaseJobsServlet protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { XConfiguration conf = new XConfiguration(request.getInputStream()); stopCron(); conf = conf.trim(); conf = conf.resolve(); validateJobConfiguration(conf); String requestUser = getUser(request); if (!requestUser.equals(UNDEF)) { conf.set(OozieClient.USER_NAME, requestUser); } BaseJobServlet.checkAuthorizationForApp(conf); JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); JSONObject json = submitJob(request, conf);//调用子类具体的实现 startCron(); sendJsonResponse(response, HttpServletResponse.SC_CREATED, json); }
JSONObject json = submitJob(request, conf);
submitJob()的具体的实现为V1JobsServlet.submitJob()
class V1JobsServlet /** * v1 service implementation to submit a job, either workflow or coordinator */ @Override protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, IOException { /*其他代码*/ if (wfPath != null) { json = submitWorkflowJob(request, conf); } /*其他代码*/ return json; }
由于当前操作类型不是dryrun,所以直接执行dagEngine.submitJob()
class V1JobsServlet /** * v1 service implementation to submit a workflow job */ @SuppressWarnings("unchecked") private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { if (action != null) { dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); } if (dryrun) { id = dagEngine.dryRunSubmit(conf); } else { id = dagEngine.submitJob(conf, startJob); } json.put(JsonTags.JOB_ID, id); return json; }
DagEngine.submitJob()
class DagEngine public String submitJob(Configuration conf, boolean startJob) throws DagEngineException { validateSubmitConfiguration(conf); try { String jobId; SubmitXCommand submit = new SubmitXCommand(conf); jobId = submit.call(); if (startJob) { start(jobId); } return jobId; } /*其他代码*/ }
工作流引擎调用了SubXCommand.call()方法(该方法定义在SubXCommand的祖先类XCommand中),在call()方法中会调用相应的execute()方法(这里面调用的是SubXCommand中重写的execute()方法)
class XCommmand @Override public final T call() throws CommandException { try { try { if (isSynchronous || || || ||) { verifyPrecondition(); Instrumentation.Cron executeCron = new Instrumentation.Cron(); executeCron.start(); ret = execute(); executeCron.stop();
最后执行SubXCommand.execute()方法。
- 点赞
- 收藏
- 关注作者
评论(0)