Oozie源码分析 (二) : Oozie的服务端执行过程
【摘要】 Oozie(驭象者)是Yahoo开发的工作流引擎,主要用于管理Hadoop任务(支持MapReduce、Spark、Pig、Hive),把这些任务以DAG(有向无环图)方式串接起来。
Oozie(驭象者)是Yahoo开发的工作流引擎,主要用于管理Hadoop任务(支持MapReduce、Spark、Pig、Hive),把这些任务以DAG(有向无环图)方式串接起来。
上一篇文章分析了Oozie的客户端启动流程,那么在客户端启动后,服务端是如何执行的呢?本文将继续分析。
由于Oozie是运行在Tomcat中的Servlet应用,通过WEB-INF目录下的web.xml可以看到相关的Listener,涉及的servlet,Filter。
在客户端通过创建Http连接,将请求发送到服务端后,服务端开始做相应的处理如先进行Filter.doFilter()工作
然后调用相应的Servlet的service()以及doPost()或doGet()方法进行进一步地处理。
1、 首先在Server端对收到的http请求进行检查,由于请求是通过浏览器过来的,直接会触发cas的filter校验。
AuthFilter.doFilter()
final HttpServletRequest hsreq = (HttpServletRequest)request;
if (hsreq.getHeader("User-Agent").startsWith("Mozilla"))
{
filterChain.doFilter(request, response);
return;
}
2、组件内部自己的健康检查或者当前集群为非安全集群,均不做安全校验。通过了FilterChain上的所有Filter校验后,开始调用相应的Servlet的service()。
//The health check request from nodeAgent without safety certification
if (SERVICE_STATUS_URL.equals(hsreq.getServletPath())
|| !ClusterInfoUtils.isSecurityCluster())
{
filterChainWrapper.doFilter(request, response);
return;
}
可以看到当通过shell命令启动一个Oozie作业时,客户端请求的URL的路径是包含/v2/jobs的,因此会找到对应的servlet-name为v1jobs的servlet,最后交由org.apache.oozie.servlet.V1JobsServlet类进行处理。
调用V1JobsServlet.service()方法(该方法定义在V1JobsServlet父类BaseJobsServlet的父类JsonRestServlet中)
,由于该请求的Method为“POST”,则会调用V1JobsServlet.doPost()方法(该方法定义在V1JobsServlet的父类BaseJobsServlet中)
class BaseJobsServlet
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
XConfiguration conf = new XConfiguration(request.getInputStream());
stopCron();
conf = conf.trim();
conf = conf.resolve();
validateJobConfiguration(conf);
String requestUser = getUser(request);
if (!requestUser.equals(UNDEF)) {
conf.set(OozieClient.USER_NAME, requestUser);
}
BaseJobServlet.checkAuthorizationForApp(conf);
JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
JSONObject json = submitJob(request, conf);//调用子类具体的实现
startCron();
sendJsonResponse(response, HttpServletResponse.SC_CREATED, json);
}
JSONObject json = submitJob(request, conf);
submitJob()的具体的实现为V1JobsServlet.submitJob()
class V1JobsServlet
/**
* v1 service implementation to submit a job, either workflow or coordinator
*/
@Override
protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException, IOException {
/*其他代码*/
if (wfPath != null) {
json = submitWorkflowJob(request, conf);
}
/*其他代码*/
return json;
}
由于当前操作类型不是dryrun,所以直接执行dagEngine.submitJob()
class V1JobsServlet
/**
* v1 service implementation to submit a workflow job
*/
@SuppressWarnings("unchecked")
private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
if (action != null) {
dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
}
if (dryrun) {
id = dagEngine.dryRunSubmit(conf);
}
else {
id = dagEngine.submitJob(conf, startJob);
}
json.put(JsonTags.JOB_ID, id);
return json;
}
DagEngine.submitJob()
class DagEngine
public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
validateSubmitConfiguration(conf);
try {
String jobId;
SubmitXCommand submit = new SubmitXCommand(conf);
jobId = submit.call();
if (startJob) {
start(jobId);
}
return jobId;
}
/*其他代码*/
}
工作流引擎调用了SubXCommand.call()方法(该方法定义在SubXCommand的祖先类XCommand中),在call()方法中会调用相应的execute()方法(这里面调用的是SubXCommand中重写的execute()方法)
class XCommmand
@Override
public final T call() throws CommandException {
try {
try {
if (isSynchronous || || || ||) {
verifyPrecondition();
Instrumentation.Cron executeCron = new Instrumentation.Cron();
executeCron.start();
ret = execute();
executeCron.stop();
最后执行SubXCommand.execute()方法。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)