【Hadoop】【Yarn】ResourceManager中的web服务

举报
沙漠里的果果酱 发表于 2023/08/10 11:38:39 2023/08/10
【摘要】 【Hadoop】【Yarn】ResourceManager中的web服务

ResourceManager.startWepApp()

protected void startWepApp() {
  Map<String, String> serviceConfig = null;
  Configuration conf = getConfig();

  RMWebAppUtil.setupSecurityAndFilters(conf, getClientRMService().rmDTSecretManager);

  Map<String, String> params = new HashMap<String, String>();
  //yarn.webapp.api-service.enable
  if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, false)) {
    String apiPackages = "org.apache.hadoop.yarn.service.webapp;" +
        "org.apache.hadoop.yarn.webapp";
    params.put("com.sun.jersey.config.property.resourceConfigClass",
        "com.sun.jersey.api.core.PackagesResourceConfig");
    params.put("com.sun.jersey.config.property.packages", apiPackages);
  }

  Builder<ResourceManager> builder =
      WebApps.$for("cluster", ResourceManager.class, this, "ws")
          .with(conf)
          .withServlet("API-Service", "/app/*",
              ServletContainer.class, params, false)
          .withHttpSpnegoPrincipalKey(
              YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
          .withHttpSpnegoKeytabKey(
              YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
          .withCSRFProtection(YarnConfiguration.RM_CSRF_PREFIX)
          .withXFSProtection(YarnConfiguration.RM_XFS_PREFIX)
          .at(webAppAddress);
  //获取RM的yarn.resourcemanager.webapp.https.address.%{i}:yarn.resourcemanager.webapp.https.port
  String proxyHostAndPort = rmContext.getProxyHostAndPort(conf);
  //如果proxyHostAndPort和真实的rm的webapp地址相同,那么说明没有使用代理。
  if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
      equals(proxyHostAndPort)) {
    if (HAUtil.isHAEnabled(conf)) {
      fetcher = new AppReportFetcher(conf);
    } else {
      fetcher = new AppReportFetcher(conf, getClientRMService());
    }
    builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
    builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
    builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE,
        HostAndPort.fromString(proxyHostAndPort).getHostText());
  }

  WebAppContext uiWebAppContext = null;
  //如果支持ui2(yarn.webapp.ui2.enable)
  if (getConfig().getBoolean(YarnConfiguration.YARN_WEBAPP_UI2_ENABLE,
      YarnConfiguration.DEFAULT_YARN_WEBAPP_UI2_ENABLE)) {
    //yarn.webapp.ui2.war-file-path,war包在rm上的路径
    String onDiskPath = getConfig().get(YarnConfiguration.YARN_WEBAPP_UI2_WARFILE_PATH);

    uiWebAppContext = new WebAppContext();
    //ui2的uri上下文为:ui2
    uiWebAppContext.setContextPath(UI2_WEBAPP_NAME);
    //如果上述路径上面没有配置,那么需要从类路径中加载war包
    if (null == onDiskPath) {
      String war = "hadoop-yarn-ui-" + VersionInfo.getVersion() + ".war";
      URLClassLoader cl = (URLClassLoader) ClassLoader.getSystemClassLoader();
      URL url = cl.findResource(war);
      //如果类路径上面没有这个war包,那么需要查看类路径./webapp/ui2是否存在,如果已经存在,则说明ui2已经被解压。
      if (null == url) {
        onDiskPath = getWebAppsPath("ui2");
      } else {
        onDiskPath = url.getFile();
      }
    }
    //如果配置项(yarn.webapp.ui2.war-file-path)中没有配置任何war包,而且类路径上面也没有war包,且类路径上面也没有./webapps/ui2目录,那么说明没有要加载的web应用。
    if (onDiskPath == null || onDiskPath.isEmpty()) {
        LOG.error("No war file or webapps found for ui2 !");
    } else {
      if (onDiskPath.endsWith(".war")) {
        uiWebAppContext.setWar(onDiskPath);
        LOG.info("Using war file at: " + onDiskPath);
      } else {
        uiWebAppContext.setResourceBase(onDiskPath);
        LOG.info("Using webapps at: " + onDiskPath);
      }
    }
  }

  webApp = builder.start(new RMWebApp(this), uiWebAppContext);
}

WebApps.start()启动该webapp。每一个WebApp.Builder对象表示一个web应用。

public WebApp start(WebApp webapp, WebAppContext ui2Context) {
    WebApp webApp = build(webapp);
    HttpServer2 httpServer = webApp.httpServer();
    if (ui2Context != null) {
      addFiltersForNewContext(ui2Context);
      httpServer.addHandlerAtFront(ui2Context);
    }
    try {
      httpServer.start();
      LOG.info("Web app " + name + " started at "
          + httpServer.getConnectorAddress(0).getPort());
    } catch (IOException e) {
      throw new WebAppException("Error starting http server", e);
    }
    return webApp;
  }


WebApps.Builder.build()

public WebApp build(WebApp webapp) {
  if (webapp == null) {
    webapp = new WebApp() {
      @Override
      public void setup() {
        // Defaults should be fine in usual cases
      }
    };
  }
  //对于RMWebApp
  //name: "cluster"
  //wsName: "ws"
  webapp.setName(name);
  webapp.setWebServices(wsName);
  String basePath = "/" + name;
  webapp.setRedirectPath(basePath);
  List<String> pathList = new ArrayList<String>();
  //如果basePath为"/",说明接收所有路径的请求。
  if (basePath.equals("/")) { 
    webapp.addServePathSpec("/*");
    pathList.add("/*");
  }  else {
    //对于rmweb的场景,这里的basePath为"/cluster"
    webapp.addServePathSpec(basePath);
    webapp.addServePathSpec(basePath + "/*");
    pathList.add(basePath + "/*");
  }
  if (wsName != null && !wsName.equals(basePath)) {
    if (wsName.equals("/")) { 
      webapp.addServePathSpec("/*");
      pathList.add("/*");
    } else {
      webapp.addServePathSpec("/" + wsName);
      webapp.addServePathSpec("/" + wsName + "/*");
      pathList.add("/" + wsName + "/*");
    }
  }

  for (ServletStruct s : servlets) {
    if (!pathList.contains(s.spec)) {
      // The servlet told us to not load-existing filters, but we still want
      // to add the default authentication filter always, so add it to the
      // pathList
      if (!s.loadExistingFilters) {
        pathList.add(s.spec);
      }
    }
  }
  if (conf == null) {
    conf = new Configuration();
  }
  try {
    if (application != null) {
      webapp.setHostClass(application.getClass());
    } else {
      String cls = inferHostClass();
      LOG.debug("setting webapp host class to {}", cls);
      webapp.setHostClass(Class.forName(cls));
    }
    if (devMode) {
      if (port > 0) {
        try {
          new URL("http://localhost:"+ port +"/__stop").getContent();
          LOG.info("stopping existing webapp instance");
          Thread.sleep(100);
        } catch (ConnectException e) {
          LOG.info("no existing webapp instance found: {}", e.toString());
        } catch (Exception e) {
          // should not be fatal
          LOG.warn("error stopping existing instance: {}", e.toString());
        }
      } else {
        LOG.error("dev mode does NOT work with ephemeral port!");
        System.exit(1);
      }
    }
    String httpScheme;
    if (this.httpPolicy == null) {
      httpScheme = WebAppUtils.getHttpSchemePrefix(conf);
    } else {
      httpScheme =
          (httpPolicy == Policy.HTTPS_ONLY) ? WebAppUtils.HTTPS_PREFIX
              : WebAppUtils.HTTP_PREFIX;
    }
    //HttpServer2是hadoop提供的嵌入式jetty web容器,通过java接口来启动web服务,不需要显式部署jetty。
    HttpServer2.Builder builder = new HttpServer2.Builder()
        .setName(name).setConf(conf).setFindPort(findPort)
        .setACL(new AccessControlList(conf.get(
            YarnConfiguration.YARN_ADMIN_ACL,
            YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)))
        .setPathSpec(pathList.toArray(new String[0]));
    // Get port ranges from config.
    // MR框架中启动AM的webApp端口随机范围可以使用参数:yarn.app.mapreduce.am.webapp.port-range配置
    IntegerRanges ranges = null;
    if (portRangeConfigKey != null) {
      ranges = conf.getRange(portRangeConfigKey, "");
    }
    int startPort = port;
    if (ranges != null && !ranges.isEmpty()) {
      // Set port ranges if its configured.
      startPort = ranges.getRangeStart();
      builder.setPortRanges(ranges);
    }
    builder.addEndpoint(URI.create(httpScheme + bindAddress + ":" + startPort));
    boolean hasSpnegoConf = spnegoPrincipalKey != null
        && conf.get(spnegoPrincipalKey) != null && spnegoKeytabKey != null
        && conf.get(spnegoKeytabKey) != null;

    if (hasSpnegoConf) {
      builder.setUsernameConfKey(spnegoPrincipalKey)
          .setKeytabConfKey(spnegoKeytabKey)
          .setSecurityEnabled(UserGroupInformation.isSecurityEnabled());
    }

    if (httpScheme.equals(WebAppUtils.HTTPS_PREFIX)) {
      WebAppUtils.loadSslConfiguration(builder, conf);
    }

    HttpServer2 server = builder.build();

    for(ServletStruct struct: servlets) {
      if (!struct.loadExistingFilters) {
        server.addInternalServlet(struct.name, struct.spec,
            struct.clazz, struct.params);
      } else {
        server.addServlet(struct.name, struct.spec, struct.clazz);
      }
    }
    for(Map.Entry<String, Object> entry : attributes.entrySet()) {
      server.setAttribute(entry.getKey(), entry.getValue());
    }
    Map<String, String> params = getConfigParameters(csrfConfigPrefix);

    if (hasCSRFEnabled(params)) {
      LOG.info("CSRF Protection has been enabled for the {} application. "
               + "Please ensure that there is an authentication mechanism "
               + "enabled (kerberos, custom, etc).",
               name);
      String restCsrfClassName = RestCsrfPreventionFilter.class.getName();
      HttpServer2.defineFilter(server.getWebAppContext(), restCsrfClassName,
                               restCsrfClassName, params,
                               new String[] {"/*"});
    }

    params = getConfigParameters(xfsConfigPrefix);

    if (hasXFSEnabled()) {
      String xfsClassName = XFrameOptionsFilter.class.getName();
      HttpServer2.defineFilter(server.getWebAppContext(), xfsClassName,
          xfsClassName, params,
          new String[] {"/*"});
    }

    HttpServer2.defineFilter(server.getWebAppContext(), "guice",
      GuiceFilter.class.getName(), null, new String[] { "/*" });

    webapp.setConf(conf);
    webapp.setHttpServer(server);

  } catch (ClassNotFoundException e) {
    throw new WebAppException("Error starting http server", e);
  } catch (IOException e) {
    throw new WebAppException("Error starting http server", e);
  }
  Injector injector = Guice.createInjector(webapp, new AbstractModule() {
    @Override
    protected void configure() {
      if (api != null) {
        bind(api).toInstance(application);
      }
    }
  });
  LOG.info("Registered webapp guice modules");
  // save a guice filter instance for webapp stop (mostly for unit tests)
  webapp.setGuiceFilter(injector.getInstance(GuiceFilter.class));
  if (devMode) {
    injector.getInstance(Dispatcher.class).setDevMode(devMode);
    LOG.info("in dev mode!");
  }
  return webapp;
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。