Spark内核详解 (4) | Spark 部署模式

举报
不温卜火 发表于 2020/11/30 23:42:28 2020/11/30
【摘要】   大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客...

  大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/

  本片博文为大家带来的是Spark 部署模式。
1


2
Spark支持3种集群管理器(Cluster Manager

分别为:

  1. Standalone:独立模式,Spark 原生的简单集群管理器,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统,使用 Standalone 可以很方便地搭建一个集群;
  2. Hadoop YARN:统一的资源管理机制,在上面可以运行多套计算框架,如 MR、Storm等。根据 Driver 在集群中的位置不同,分为 yarn client 和 yarn cluster;
  3. Apache Mesos:一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括 Yarn。

实际上,除了上述这些通用的集群管理器外,Spark 内部也提供了方便用户测试和学习的简单集群部署模式。由于在实际生产环境下使用的绝大多数的集群管理器是 Hadoop YARN,因此我们关注的重点是 Hadoop YARN 模式下的 Spark 集群部署。

一. Yarn 模式运行机制

1.1 YARN Cluster 模式

3

  1. 执行脚本提交任务,实际是启动一个 SparkSubmit 的 JVM 进程;
  2. SparkSubmit 类中的 main方法反射调用Client的main方法;
  3. Client创建Yarn客户端,然后向Yarn发送执行指令:bin/java ApplicationMaster;
  4. Yarn框架收到指令后会在指定的NM中启动ApplicationMaster;
  5. ApplicationMaster启动Driver线程,执行用户的作业;
  6. AM向RM注册,申请资源;
  7. 获取资源后AM向NM发送指令:bin/java CoarseGrainedExecutorBacken;
  8. 启动ExecutorBackend, 并向driver注册.
  9. 注册成功后, ExecutorBackend会创建一个Executor对象.
  10. Driver会给ExecutorBackend分配任务, 并监控任务的执行.

注意:

  • SparkSubmit、ApplicationMaster和CoarseGrainedExecutorBacken是独立的进程;
  • Client和Driver是独立的线程;
  • Executor是一个对象。

1.2 Yarn Client 模式

4

  1. 执行脚本提交任务,实际是启动一个SparkSubmit的 JVM 进程;

  2. SparkSubmit伴生对象中的main方法反射调用用户代码的main方法;

  3. 启动Driver线程,执行用户的作业,并创建ScheduleBackend;

  4. YarnClientSchedulerBackend向RM发送指令:bin/java ExecutorLauncher;

  5. Yarn框架收到指令后会在指定的NM中启动ExecutorLauncher(实际上还是调用ApplicationMaster的main方法);

object ExecutorLauncher { def main(args: Array[String]): Unit = { ApplicationMaster.main(args)
  }

}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. AM向RM注册,申请资源;
  2. 获取资源后AM向NM发送指令:bin/java CoarseGrainedExecutorBacken;
  3. 后面和cluster模式一致

注意

  • SparkSubmit、ExecutorLauncher和CoarseGrainedExecutorBacken是独立的进程;

  • driver不是一个子线程,而是直接运行在SparkSubmit进程的main线程中, 所以sparkSubmit进程不能退出.

二. 运行机制源码分析

2.1 Yarn cluster 模式运行机制源码分析

启动下面的代码:

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

yarn 会按照下面的顺序依次启动了 3 个进程:

SparkSubmit
ApplicationMaster
CoarseGrainedExecutorB ackend


  
 
  • 1
  • 2
  • 3
  • 4
  • 1. bin/spark-submit 启动脚本分析
    启动类org.apache.spark.deploy.SparkSubmit
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

  
 
  • 1

/bin/spark-class

exec "${CMD[@]}"

  
 
  • 1

最终启动类:

/opt/module/jdk1.8.0_172/bin/java -cp /opt/module/spark-yarn/conf/:/opt/module/spark-yarn/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.11-2.1.1.jar 100


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 2. org.apache.spark.deploy.SparkSubmit 源码分析

SparkSubmit伴生对象

main方法

def main(args: Array[String]): Unit = { /* 参数 --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.11-2.1.1.jar 100 */ val appArgs = new SparkSubmitArguments(args) appArgs.action match { // 如果没有指定 action, 则 action 的默认值是:   action = Option(action).getOrElse(SUBMIT) case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

submit方法

/**
  * 使用提供的参数提交应用程序
  * 有 2 步:
  * 1. 准备启动环境. * 根据集群管理器和部署模式为 child main class 设置正确的 classpath, 系统属性,应用参数
  * 2. 使用启动环境调用 child main class 的 main 方法
  */
@tailrec
private def submit(args: SparkSubmitArguments): Unit = { // 准备提交环境  childMainClass = "org.apache.spark.deploy.yarn.Client" val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser != null) { } else { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } } if (args.isStandaloneCluster && args.useRest) { // 在其他任何模式, 仅仅运行准备好的主类 } else { doRunMain() }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

prepareSubmitEnvironment方法

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) { // 在 yarn 集群模式下, 使用 yarn.Client 来封装一下 user class childMainClass = "org.apache.spark.deploy.yarn.Client"
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

doRunMain方法

def doRunMain(): Unit = { if (args.proxyUser != null) { } else { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

runMain方法

/**
  * * 使用给定启动环境运行 child class 的 main 方法
  * 注意: 如果使用了cluster deploy mode, 主类并不是用户提供
  */
private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = { var mainClass: Class[_] = null try { // 使用反射的方式加载 childMainClass = "org.apache.spark.deploy.yarn.Client" mainClass = Utils.classForName(childMainClass) } catch { } // 反射出来 Client 的 main 方法 val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } try { // 调用 main 方法.  mainMethod.invoke(null, childArgs.toArray) } catch { }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 3. org.apache.spark.deploy.yarn.Client 源码分析

main方法

def main(argStrings: Array[String]) { // 设置环境变量 SPARK_YARN_MODE 表示运行在 YARN mode // 注意: 任何带有 SPARK_ 前缀的环境变量都会分发到所有的进程, 也包括远程进程 System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf // 对传递来的参数进一步封装 val args = new ClientArguments(argStrings) new Client(args, sparkConf).run()
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

Client.run方法

def run(): Unit = { // 提交应用, 返回应用的 id this.appId = submitApplication()
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5

client.submitApplication 方法

/**
  *
  * 向 ResourceManager 提交运行 ApplicationMaster 的应用程序。
  *
  */
def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { // 初始化 yarn 客户端 yarnClient.init(yarnConf) // 启动 yarn 客户端 yarnClient.start() // 从 RM 创建一个应用程序 val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() // 获取到 applicationID appId = newAppResponse.getApplicationId() reportLauncherState(SparkAppHandle.State.SUBMITTED) launcherBackend.setAppId(appId.toString) // Set up the appropriate contexts to launch our AM // 设置正确的上下文对象来启动 ApplicationMaster val containerContext = createContainerLaunchContext(newAppResponse) // 创建应用程序提交任务上下文 val appContext = createApplicationSubmissionContext(newApp, containerContext) // 提交应用给 ResourceManager 启动 ApplicationMaster  // "org.apache.spark.deploy.yarn.ApplicationMaster" yarnClient.submitApplication(appContext) appId } catch { }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

方法: createContainerLaunchContext

private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = { val amClass = if (isClusterMode) {  // 如果是 Cluster 模式 Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { // 如果是 Client 模式 Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } amContainer
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

至此, SparkSubmit 进程启动完毕.

  • 4. org.apache.spark.deploy.yarn.ApplicationMaster 源码分析

ApplicationMaster伴生对象的 main方法

def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) // 构建 ApplicationMasterArguments 对象, 对传来的参数做封装 val amArgs: ApplicationMasterArguments = new ApplicationMasterArguments(args) SparkHadoopUtil.get.runAsSparkUser { () => // 构建 ApplicationMaster 实例  ApplicationMaster 需要与 RM通讯 master = new ApplicationMaster(amArgs, new YarnRMClient) // 运行 ApplicationMaster 的 run 方法, run 方法结束之后, 结束 ApplicationMaster 进程 System.exit(master.run()) }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

ApplicationMaster 伴生类的 run方法

final def run(): Int = { // 关键核心代码 try { val fs = FileSystem.get(yarnConf) if (isClusterMode) { runDriver(securityMgr) } else { runExecutorLauncher(securityMgr) } } catch { } exitCode
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

runDriver 方法

private def runDriver(securityMgr: SecurityManager): Unit = { addAmIpFilter() // 开始执行用户类. 启动一个子线程来执行用户类的 main 方法.  返回值就是运行用户类的子线程. // 线程名就叫 "Driver" userClassThread = startUserApplication() val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { // 注册 ApplicationMaster , 其实就是请求资源 registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) // 线程 join: 把userClassThread线程执行完毕之后再继续执行当前线程. userClassThread.join() } catch { }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

startUserApplication 方法

private def startUserApplication(): Thread = { // 得到用户类的 main 方法 val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]]) // 创建及线程 val userThread = new Thread { override def run() { try { // 调用用户类的主函数 mainMethod.invoke(null, userArgs.toArray) } catch { } finally { } } } userThread.setContextClassLoader(userClassLoader) userThread.setName("Driver") userThread.start() userThread
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

registerAM 方法

private def registerAM( _sparkConf: SparkConf, _rpcEnv: RpcEnv, driverRef: RpcEndpointRef, uiAddress: String, securityMgr: SecurityManager) = { // 向 RM 注册, 得到 YarnAllocator allocator = client.register(driverUrl, driverRef, yarnConf, _sparkConf, uiAddress, historyAddress, securityMgr, localResources) // 请求分配资源 allocator.allocateResources()
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

allocator.allocateResources() 方法

/**
  请求资源,如果 Yarn 满足了我们的所有要求,我们就会得到一些容器(数量: maxExecutors)。 通过在这些容器中启动 Executor 来处理 YARN 授予我们的任何容器。 

必须同步,因为在此方法中读取的变量会被其他方法更改。
  */
def allocateResources(): Unit = synchronized { if (allocatedContainers.size > 0) { handleAllocatedContainers(allocatedContainers.asScala) }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

handleAllocatedContainers方法

/**
  处理 RM 授权给我们的容器
  */
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = { val containersToUse = new ArrayBuffer[Container](allocatedContainers.size) runAllocatedContainers(containersToUse)
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

runAllocatedContainers 方法

/**
  * Launches executors in the allocated containers.
  在已经分配的容器中启动 Executors
  */
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { // 每个容器上启动一个 Executor for (container <- containersToUse) { if (numExecutorsRunning < targetNumExecutors) { if (launchContainers) { launcherPool.execute(new Runnable { override def run(): Unit = { try { new ExecutorRunnable( Some(container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources ).run()  // 启动 executor updateInternalState() } catch { } } }) } else { } } else { } }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

ExecutorRunnable.run方法

def run(): Unit = { logDebug("Starting Executor Container") // 创建 NodeManager 客户端 nmClient = NMClient.createNMClient() // 初始化 NodeManager 客户端 nmClient.init(conf) // 启动 NodeManager 客户端 nmClient.start() // 启动容器 startContainer()
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

ExecutorRunnable.startContainer()

def startContainer(): java.util.Map[String, ByteBuffer] = { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] // 准备要执行的命令 val commands = prepareCommand() ctx.setCommands(commands.asJava) // Send the start request to the ContainerManager try { // 启动容器 nmClient.startContainer(container.get, ctx) } catch { }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

ExecutorRunnable.prepareCommand 方法

private def prepareCommand(): List[String] = { val commands = prefixEnv ++ Seq( YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server") ++ javaOpts ++ // 要执行的类 Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId) ++ userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") commands.map(s => if (s == null) "null" else s).toList
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

至此, ApplicationMaster 进程启动完毕

  • 5. org.apache.spark.executor.CoarseGrainedExecutorBackend 源码分析

CoarseGrainedExecutorBackend 伴生对象
main方法

def main(args: Array[String]) { // 启动 CoarseGrainedExecutorBackend
  run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
  // 运行结束之后退出进程
  System.exit(0)
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

run 方法

 /** 准备 RpcEnv
*/
private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { SparkHadoopUtil.get.runAsSparkUser { () => val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

CoarseGrainedExecutorBackend 伴生类
继承自: ThreadSafeRpcEndpoint 是一个RpcEndpoint
查看生命周期方法
onStart 方法
连接到 Driver, 并向 Driver注册Executor

override def onStart() { rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) // 向驱动注册 Executor 关键方法 ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { case Success(msg) => case Failure(e) => // 注册失败, 退出 executor  exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread)
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Driver端的CoarseGrainedSchedulerBackend.DriverEndPoint 的 receiveAndReply 方法

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // 接收注册 Executor case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => if (executorDataMap.contains(executorId)) {  // 已经注册过了 } else { // 给 Executor  发送注册成功的信息 executorRef.send(RegisteredExecutor) }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Eexcutor端的CoarseGrainedExecutorBackend的receive方法

override def receive: PartialFunction[Any, Unit] = { // 向 Driver 注册成功 case RegisteredExecutor => logInfo("Successfully registered with driver") try { // 创建 Executor 对象   注意: Executor 其实是一个对象 executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

至此, Executor 创建完毕

总结
5
6
7

2.2 Yarn client 模式运行机制源码分析

执行下面的代码:

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

启动类

/opt/module/jdk1.8.0_172/bin/java 
-cp /opt/module/spark-yarn/conf/:/opt/module/spark-yarn/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ 
-Xmx1g 
org.apache.spark.deploy.SparkSubmit 
--master yarn 
--deploy-mode client 
--class org.apache.spark.examples.SparkPi 
./examples/jars/spark-examples_2.11-2.1.1.jar 100


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

依次启动 3 个不同的进程

SparkSubmit
ExecutorLauncher
CoarseGrainedExecutorBackend


  
 
  • 1
  • 2
  • 3
  • 4
  • 1. client 模式下直接运行用户的主类

prepareSubmitEnvironment 方法

/* client 模式下, 直接启动用户的主类
*/
if (deployMode == CLIENT || isYarnCluster) { // 如果是客户端模式, childMainClass 就是用户的类 // 集群模式下, childMainClass 被重新赋值为 org.apache.spark.deploy.yarn.Client childMainClass = args.mainClass
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

然后不会创建ApplicationMaster, 而是直接执行用户类的main方法

然后开始实例化 SparkContext

实例化SparkContext

val (sched, ts):(SchedulerBackend, TaskScheduler) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 启动 YarnScheduler
_taskScheduler.start()


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

SparkContext.createTaskScheduler 方法

关键代码:
private def createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ master match { case masterUrl => // 得到的是 YarnClusterManager val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { // 创建 YarnScheduler val scheduler: TaskScheduler = cm.createTaskScheduler(sc, masterUrl) // 创建 YarnClientSchedulerBackend val backend: SchedulerBackend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { } }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

YarnClusterManager 类

private[spark] class YarnClusterManager extends ExternalClusterManager { override def canCreate(masterURL: String): Boolean = { masterURL == "yarn" } override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { sc.deployMode match { case "cluster" => new YarnClusterScheduler(sc) case "client" => new YarnScheduler(sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { sc.deployMode match { case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

_taskScheduler.start()
YarnClientSchedulerBackend 的 start 方法

/**
  * Create a Yarn client to submit an application to the ResourceManager.
  * This waits until the application is running.
  *
  * 创建客户端, 提交应用给 ResourceManager
  * 会一直等到应用开始执行
  */
override def start() { val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ("--arg", hostport) val args = new ClientArguments(argsArrayBuf.toArray) client = new Client(args, conf) // 使用 Client 提交应用 bindToYarn(client.submitApplication(), None) waitForApplication()
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 2. org.apache.spark.deploy.yarn.Client 源码再分析

submitApplication 方法

yarnClient.submitApplication(appContext)

  
 
  • 1

ExecutorLauncher 类
yarnClient 提交应用的时候, 把要执行的主类(ExecutorLauncher)封装到配置中. 所以不是启动ApplicationMaster, 而是启动ExecutorLauncher

// createContainerLaunchContext()
val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName }
/**
  * This object does not provide any special functionality. It exists so that it's easy to tell
  * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
  * * 这个对象不提供任何特定的功能.
  * * 它的存在使得在使用诸如ps或jps之类的工具时,很容易区分客户机模式AM和集群模式AM。
  * */
object ExecutorLauncher { def main(args: Array[String]): Unit = { ApplicationMaster.main(args) }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 3. ApplicationMaster 源码再分析

run 方法

final def run(): Int = { try { if (isClusterMode) { runDriver(securityMgr) } else { // 非集群模式, 直接执行 ExecutorLauncher, 而不在需要运行 Driver runExecutorLauncher(securityMgr) } } catch { } exitCode
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

runExecutorLauncher

private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { val driverRef = waitForSparkDriver() addAmIpFilter() registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) // In client mode the actor will stop the reporter thread. reporterThread.join()
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在以后的执行流程就和yarn-cluster模式一样了. 在此就不再赘述

总结

8

三. Standalone 模式运行机制

Standalone 集群有 2 个重要组成部分,分别是:

  1. Master(RM):是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责;

  2. Worker(NM):是一个进程,一个 Worker 运行在集群中的一台服务器上,主要负责两个职责,

  3. 一个是用自己的内存存储 RDD 的某个或某些 partition

  4. 另一个是启动其他进程和线程(Executor),对 RDD 上的 partition 进行并行的处理和计算。

根据 driver的位置不同, 也分 2 种:

  • 1. Standalone Cluster 模式

9
在Standalone Cluster模式下,任务提交后,Master会找到一个 Worker 启动Driver。

Driver启动后向Master注册应用程序,Master根据 submit 脚本的资源需求找到内部资源至少可以启动一个Executor 的所有Worker,然后在这些 Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,所有的 Executor 注册完成后,Driver 开始执行main函数,之后执行到Action算子时,开始划分 tage,每个 Stage 生成对应的taskSet,之后将 Task 分发到各个 Executor 上执行。

  • 2. Standalone Clientr 模式

10
在 Standalone Client 模式下,Driver 在任务提交的本地机器上运行。

Driver启动后向 Master 注册应用程序,Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个Executor 的所有 Worker,然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向Driver反向注册,所有的Executor注册完成后,Driver 开始执行main函数,之后执行到Action算子时,开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行。

  本次的分享就到这里了,


14

  好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
  如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
  码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!

15
16

文章来源: buwenbuhuo.blog.csdn.net,作者:不温卜火,版权归原作者所有,如需转载,请联系作者。

原文链接:buwenbuhuo.blog.csdn.net/article/details/108077472

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。