Oozie源码分析 (二) : Oozie的服务端执行过程

举报
那人好像一条狗~ 发表于 2020/06/24 10:55:11 2020/06/24
【摘要】 Oozie(驭象者)是Yahoo开发的工作流引擎,主要用于管理Hadoop任务(支持MapReduce、Spark、Pig、Hive),把这些任务以DAG(有向无环图)方式串接起来。

Oozie(驭象者)Yahoo开发的工作流引擎,主要用于管理Hadoop任务(支持MapReduce、Spark、Pig、Hive),把这些任务以DAG(有向无环图)方式串接起来。

上一篇文章分析了Oozie的客户端启动流程,那么在客户端启动后,服务端是如何执行的呢?本文将继续分析。

由于Oozie是运行在Tomcat中的Servlet应用,通过WEB-INF目录下的web.xml可以看到相关的Listener,涉及的servletFilter

在客户端通过创建Http连接,将请求发送到服务端后,服务端开始做相应的处理如先进行Filter.doFilter()工作

然后调用相应的Servletservice()以及doPost()doGet()方法进行进一步地处理。


1、  首先在Server端对收到的http请求进行检查,由于请求是通过浏览器过来的,直接会触发casfilter校验。

AuthFilter.doFilter()
final HttpServletRequest hsreq = (HttpServletRequest)request;
 
 if (hsreq.getHeader("User-Agent").startsWith("Mozilla"))
 {
    filterChain.doFilter(request, response);
    return;
 }

2、组件内部自己的健康检查或者当前集群为非安全集群,均不做安全校验。通过了FilterChain上的所有Filter校验后,开始调用相应的Servlet的service()。

//The health check request from nodeAgent without safety certification
 if (SERVICE_STATUS_URL.equals(hsreq.getServletPath())
         ||  !ClusterInfoUtils.isSecurityCluster())
 {
     
     filterChainWrapper.doFilter(request, response);
     return;
 }

可以看到当通过shell命令启动一个Oozie作业时,客户端请求的URL的路径是包含/v2/jobs的,因此会找到对应的servlet-namev1jobsservlet,最后交由org.apache.oozie.servlet.V1JobsServlet类进行处理。

调用V1JobsServlet.service()方法(该方法定义在V1JobsServlet父类BaseJobsServlet的父类JsonRestServlet中)

,由于该请求的Method“POST”,则会调用V1JobsServlet.doPost()方法(该方法定义在V1JobsServlet的父类BaseJobsServlet中)

class BaseJobsServlet
protected void doPost(HttpServletRequest request,
                  HttpServletResponse response) throws ServletException, IOException {
 
     XConfiguration conf = new XConfiguration(request.getInputStream());
 
     stopCron();
 
           conf = conf.trim();
                    conf = conf.resolve();
 
     validateJobConfiguration(conf);
     String requestUser = getUser(request);
     if (!requestUser.equals(UNDEF)) {
         conf.set(OozieClient.USER_NAME, requestUser);
     }
     BaseJobServlet.checkAuthorizationForApp(conf);
     JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME),   conf.get(OozieClient.GROUP_NAME), conf);
 
     JSONObject json = submitJob(request, conf);//调用子类具体的实现
     startCron();
     sendJsonResponse(response, HttpServletResponse.SC_CREATED, json);
 }

JSONObject json = submitJob(request, conf);

submitJob()的具体的实现为V1JobsServlet.submitJob()

class V1JobsServlet
 /**
  * v1 service implementation to submit a job, either workflow or coordinator
  */
 @Override
 protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, IOException {
     /*其他代码*/
         if (wfPath != null) {
          json = submitWorkflowJob(request, conf);
         }
     /*其他代码*/
     return json;
 }

由于当前操作类型不是dryrun,所以直接执行dagEngine.submitJob()

class V1JobsServlet
/**
 * v1 service implementation to submit a workflow job
 */
 @SuppressWarnings("unchecked")
 private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
 
         if (action != null) {
             dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
         }
         if (dryrun) {
             id = dagEngine.dryRunSubmit(conf);
         }
         else {
          id = dagEngine.submitJob(conf, startJob);
         }
         json.put(JsonTags.JOB_ID, id);
     return json;
 }

DagEngine.submitJob()

class DagEngine
public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
     validateSubmitConfiguration(conf);
     try {
         String jobId;
         SubmitXCommand submit = new SubmitXCommand(conf);
         jobId = submit.call();
         if (startJob) {
             start(jobId);
         }
         return jobId;
     }
     /*其他代码*/
 }

工作流引擎调用了SubXCommand.call()方法(该方法定义在SubXCommand的祖先类XCommand中),在call()方法中会调用相应的execute()方法(这里面调用的是SubXCommand中重写的execute()方法)

class XCommmand
@Override
 public final T call() throws CommandException {
     try {
         try {
             if (isSynchronous || || || ||) {
                 verifyPrecondition();
                 Instrumentation.Cron executeCron = new Instrumentation.Cron();
                 executeCron.start();
                 ret = execute();
                 executeCron.stop();

最后执行SubXCommand.execute()方法

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。