【Hadoop】【Yarn】ResourceManager中的web服务
【摘要】 【Hadoop】【Yarn】ResourceManager中的web服务
ResourceManager.startWepApp()
protected void startWepApp() {
Map<String, String> serviceConfig = null;
Configuration conf = getConfig();
RMWebAppUtil.setupSecurityAndFilters(conf, getClientRMService().rmDTSecretManager);
Map<String, String> params = new HashMap<String, String>();
//yarn.webapp.api-service.enable
if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, false)) {
String apiPackages = "org.apache.hadoop.yarn.service.webapp;" +
"org.apache.hadoop.yarn.webapp";
params.put("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig");
params.put("com.sun.jersey.config.property.packages", apiPackages);
}
Builder<ResourceManager> builder =
WebApps.$for("cluster", ResourceManager.class, this, "ws")
.with(conf)
.withServlet("API-Service", "/app/*",
ServletContainer.class, params, false)
.withHttpSpnegoPrincipalKey(
YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
.withHttpSpnegoKeytabKey(
YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
.withCSRFProtection(YarnConfiguration.RM_CSRF_PREFIX)
.withXFSProtection(YarnConfiguration.RM_XFS_PREFIX)
.at(webAppAddress);
//获取RM的yarn.resourcemanager.webapp.https.address.%{i}:yarn.resourcemanager.webapp.https.port
String proxyHostAndPort = rmContext.getProxyHostAndPort(conf);
//如果proxyHostAndPort和真实的rm的webapp地址相同,那么说明没有使用代理。
if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
equals(proxyHostAndPort)) {
if (HAUtil.isHAEnabled(conf)) {
fetcher = new AppReportFetcher(conf);
} else {
fetcher = new AppReportFetcher(conf, getClientRMService());
}
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE,
HostAndPort.fromString(proxyHostAndPort).getHostText());
}
WebAppContext uiWebAppContext = null;
//如果支持ui2(yarn.webapp.ui2.enable)
if (getConfig().getBoolean(YarnConfiguration.YARN_WEBAPP_UI2_ENABLE,
YarnConfiguration.DEFAULT_YARN_WEBAPP_UI2_ENABLE)) {
//yarn.webapp.ui2.war-file-path,war包在rm上的路径
String onDiskPath = getConfig().get(YarnConfiguration.YARN_WEBAPP_UI2_WARFILE_PATH);
uiWebAppContext = new WebAppContext();
//ui2的uri上下文为:ui2
uiWebAppContext.setContextPath(UI2_WEBAPP_NAME);
//如果上述路径上面没有配置,那么需要从类路径中加载war包
if (null == onDiskPath) {
String war = "hadoop-yarn-ui-" + VersionInfo.getVersion() + ".war";
URLClassLoader cl = (URLClassLoader) ClassLoader.getSystemClassLoader();
URL url = cl.findResource(war);
//如果类路径上面没有这个war包,那么需要查看类路径./webapp/ui2是否存在,如果已经存在,则说明ui2已经被解压。
if (null == url) {
onDiskPath = getWebAppsPath("ui2");
} else {
onDiskPath = url.getFile();
}
}
//如果配置项(yarn.webapp.ui2.war-file-path)中没有配置任何war包,而且类路径上面也没有war包,且类路径上面也没有./webapps/ui2目录,那么说明没有要加载的web应用。
if (onDiskPath == null || onDiskPath.isEmpty()) {
LOG.error("No war file or webapps found for ui2 !");
} else {
if (onDiskPath.endsWith(".war")) {
uiWebAppContext.setWar(onDiskPath);
LOG.info("Using war file at: " + onDiskPath);
} else {
uiWebAppContext.setResourceBase(onDiskPath);
LOG.info("Using webapps at: " + onDiskPath);
}
}
}
webApp = builder.start(new RMWebApp(this), uiWebAppContext);
}
WebApps.start()启动该webapp。每一个WebApp.Builder对象表示一个web应用。
public WebApp start(WebApp webapp, WebAppContext ui2Context) {
WebApp webApp = build(webapp);
HttpServer2 httpServer = webApp.httpServer();
if (ui2Context != null) {
addFiltersForNewContext(ui2Context);
httpServer.addHandlerAtFront(ui2Context);
}
try {
httpServer.start();
LOG.info("Web app " + name + " started at "
+ httpServer.getConnectorAddress(0).getPort());
} catch (IOException e) {
throw new WebAppException("Error starting http server", e);
}
return webApp;
}
WebApps.Builder.build()
public WebApp build(WebApp webapp) {
if (webapp == null) {
webapp = new WebApp() {
@Override
public void setup() {
// Defaults should be fine in usual cases
}
};
}
//对于RMWebApp
//name: "cluster"
//wsName: "ws"
webapp.setName(name);
webapp.setWebServices(wsName);
String basePath = "/" + name;
webapp.setRedirectPath(basePath);
List<String> pathList = new ArrayList<String>();
//如果basePath为"/",说明接收所有路径的请求。
if (basePath.equals("/")) {
webapp.addServePathSpec("/*");
pathList.add("/*");
} else {
//对于rmweb的场景,这里的basePath为"/cluster"
webapp.addServePathSpec(basePath);
webapp.addServePathSpec(basePath + "/*");
pathList.add(basePath + "/*");
}
if (wsName != null && !wsName.equals(basePath)) {
if (wsName.equals("/")) {
webapp.addServePathSpec("/*");
pathList.add("/*");
} else {
webapp.addServePathSpec("/" + wsName);
webapp.addServePathSpec("/" + wsName + "/*");
pathList.add("/" + wsName + "/*");
}
}
for (ServletStruct s : servlets) {
if (!pathList.contains(s.spec)) {
// The servlet told us to not load-existing filters, but we still want
// to add the default authentication filter always, so add it to the
// pathList
if (!s.loadExistingFilters) {
pathList.add(s.spec);
}
}
}
if (conf == null) {
conf = new Configuration();
}
try {
if (application != null) {
webapp.setHostClass(application.getClass());
} else {
String cls = inferHostClass();
LOG.debug("setting webapp host class to {}", cls);
webapp.setHostClass(Class.forName(cls));
}
if (devMode) {
if (port > 0) {
try {
new URL("http://localhost:"+ port +"/__stop").getContent();
LOG.info("stopping existing webapp instance");
Thread.sleep(100);
} catch (ConnectException e) {
LOG.info("no existing webapp instance found: {}", e.toString());
} catch (Exception e) {
// should not be fatal
LOG.warn("error stopping existing instance: {}", e.toString());
}
} else {
LOG.error("dev mode does NOT work with ephemeral port!");
System.exit(1);
}
}
String httpScheme;
if (this.httpPolicy == null) {
httpScheme = WebAppUtils.getHttpSchemePrefix(conf);
} else {
httpScheme =
(httpPolicy == Policy.HTTPS_ONLY) ? WebAppUtils.HTTPS_PREFIX
: WebAppUtils.HTTP_PREFIX;
}
//HttpServer2是hadoop提供的嵌入式jetty web容器,通过java接口来启动web服务,不需要显式部署jetty。
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName(name).setConf(conf).setFindPort(findPort)
.setACL(new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)))
.setPathSpec(pathList.toArray(new String[0]));
// Get port ranges from config.
// MR框架中启动AM的webApp端口随机范围可以使用参数:yarn.app.mapreduce.am.webapp.port-range配置
IntegerRanges ranges = null;
if (portRangeConfigKey != null) {
ranges = conf.getRange(portRangeConfigKey, "");
}
int startPort = port;
if (ranges != null && !ranges.isEmpty()) {
// Set port ranges if its configured.
startPort = ranges.getRangeStart();
builder.setPortRanges(ranges);
}
builder.addEndpoint(URI.create(httpScheme + bindAddress + ":" + startPort));
boolean hasSpnegoConf = spnegoPrincipalKey != null
&& conf.get(spnegoPrincipalKey) != null && spnegoKeytabKey != null
&& conf.get(spnegoKeytabKey) != null;
if (hasSpnegoConf) {
builder.setUsernameConfKey(spnegoPrincipalKey)
.setKeytabConfKey(spnegoKeytabKey)
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled());
}
if (httpScheme.equals(WebAppUtils.HTTPS_PREFIX)) {
WebAppUtils.loadSslConfiguration(builder, conf);
}
HttpServer2 server = builder.build();
for(ServletStruct struct: servlets) {
if (!struct.loadExistingFilters) {
server.addInternalServlet(struct.name, struct.spec,
struct.clazz, struct.params);
} else {
server.addServlet(struct.name, struct.spec, struct.clazz);
}
}
for(Map.Entry<String, Object> entry : attributes.entrySet()) {
server.setAttribute(entry.getKey(), entry.getValue());
}
Map<String, String> params = getConfigParameters(csrfConfigPrefix);
if (hasCSRFEnabled(params)) {
LOG.info("CSRF Protection has been enabled for the {} application. "
+ "Please ensure that there is an authentication mechanism "
+ "enabled (kerberos, custom, etc).",
name);
String restCsrfClassName = RestCsrfPreventionFilter.class.getName();
HttpServer2.defineFilter(server.getWebAppContext(), restCsrfClassName,
restCsrfClassName, params,
new String[] {"/*"});
}
params = getConfigParameters(xfsConfigPrefix);
if (hasXFSEnabled()) {
String xfsClassName = XFrameOptionsFilter.class.getName();
HttpServer2.defineFilter(server.getWebAppContext(), xfsClassName,
xfsClassName, params,
new String[] {"/*"});
}
HttpServer2.defineFilter(server.getWebAppContext(), "guice",
GuiceFilter.class.getName(), null, new String[] { "/*" });
webapp.setConf(conf);
webapp.setHttpServer(server);
} catch (ClassNotFoundException e) {
throw new WebAppException("Error starting http server", e);
} catch (IOException e) {
throw new WebAppException("Error starting http server", e);
}
Injector injector = Guice.createInjector(webapp, new AbstractModule() {
@Override
protected void configure() {
if (api != null) {
bind(api).toInstance(application);
}
}
});
LOG.info("Registered webapp guice modules");
// save a guice filter instance for webapp stop (mostly for unit tests)
webapp.setGuiceFilter(injector.getInstance(GuiceFilter.class));
if (devMode) {
injector.getInstance(Dispatcher.class).setDevMode(devMode);
LOG.info("in dev mode!");
}
return webapp;
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)