Spark任务调度 | Spark,从入门到精通
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)
如上图所示是 Spark 的执行过程,那么具体 Drvier 是如何把 Task 提交给 Executor 的呢?本文将通过 DAGScheduler 、TaskScheduler、调度池和 Executor 四部分介绍 Spark 的任务调度原理及过程。
DAGScheduler
Spark 任务调度中各个 RDD 之间存在着依赖关系,这些依赖关系就形成有向无环图 DAG,DAGScheduler 负责对这些依赖关系形成的 DAG 并进行 Stage 划分,而 DAGScheduler 分为创建、Job 提交、Stage 划分、Task 生成四个部分。
DAGScheduler 创建
private[spark]class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock())extends Logging {def this(sc: SparkContext, taskScheduler: TaskScheduler) = { this( sc, taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) } ..... }
DAGScheduler 在 SparkContext 中创建,并且需要提供 TaskScheduler 的实例。在构造函数中的 MapOutputTrackerMaster 是运行在 Driver 端用来管理 ShuffleMapTask 的输出,下游的 Task 可以通过 MapOutputTrackerMaster 来获取 Shuffle 输出的位置信息。
private[spark]class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock()) extends Logging { ..... private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) ..... }
DAGScheduler 是基于 Akka Actor 的消息传递机制来构建事件循环处理逻辑,如上段代码所示,在 DAGScheduler 初始化时创建了 eventProcessLoop 以处理各种 DAGSchedulerEvent,这些事件包括作业的提交、任务状态的变化、监控等。
Job 提交
上图所示是 RDD 的 count 执行调用过程。其中,在 DAGScheduelr 的 submitJob 方法中会生成 JobId,并创建一个 JobWaiter 监听 Job 是否执行成功。一个 Job 内包含多个 Task,只有所有 Task 都执行成功该 Job 才会被 JobWaiter 标记为 Succes。
Stage 划分
用户提交的计算任务是由多个 RDD 构成的 DAG, 当 RDD 在转换时需要进行 Shuffle,Shuffle 的过程中就将这个 DAG 划分成了多个 Stage。
由于后面的 Stage 需要前面的 Stage 提供 Shuffle 的结果,因此不同的 Stage 不能并行计算。那么 RDD 在哪些操作时需要进行 Shuffle 呢?这里涉及到 RDD 的两种依赖关系:宽依赖与窄依赖。上图左侧所示为窄依赖,由于 RDD 每个 partition 依赖固定数量的 parent RDD 的 partition,所以可以通过 Task 来处理这些 partition。而且这些 partition 相互独立,所以 Task 可以并行计算。宽依赖反之。
让我们举例说明 Stage 的划分过程,上图所示从触发 Action 的 RDD G 开始划分,G 依赖 B 和 F,处理 B 和 F 的顺序是随机的,假设先处理 B。由于 G 和 B 是窄依赖关系,可以划分到同一个 Stage 。接着处理 F,此时 F 和 G 是宽依赖关系,所以将 F 划分到一个新的 Stage,以此类推划分其它 Stage。
接着以 Stage 1 为例看它的计算方式,如图 4 所示 RDD A 有三个 Partition,因此会生成三个 ShuffleMapTask,这三个 Task 会把结果输出到三个 Partition 中。
Task 生成
任务生成首先要获取需要计算的 Partition,如果是最后的 Stage 所对应的 Task 是 ResultTask,那么先判断 ResultTask 是否结束,若结束则无需计算;对于其它 Stage 对应的都是 ShuffleMapTask,因此只需要判断 Stage 中是否有缓存结果。判断出哪些 Partition 需要计算后生成对应的 Task,然后封装到相应的 TaskSet 中,并提交给 TaskScheduler。TaskSet 中包含了一组处理逻辑完全相同的 Task,但它们的处理数据不同,这里的每个 Task 负责一个 partition。
TaskScheduler
TaskScheduler 是在 SparkContext 中通过 createTaskScheduler 把引用传给 DAGScheduler 的构造函数。每个 TaskScheduler 都会对应一个 SchedulerBackend,TaskScheduler 负责 Application 中不同 job 之间的调度,在 Task 执行失败时启动重试机制,并且为执行速度慢的 Task 启动备份的任务;而 SchdulerBackend 负责与 Cluster Manager 交互,获取该 Application 分配到的资源,然后传给 TaskScheduler。
TaskScheduler 执行流程主要分成两个部分:Driver 端执行和 Executor 执行,他们的执行步骤分别如下:
Driver 端执行
TaskSchedulerImpl#submitTasks 将Task加入到TaskSetManager当中
ScheduleBuilder#addTaskSetManager 根据调度优先级确定调度顺序
CoarseGrainedSchdulerBackend#reviveOffers
DriverActor#makeOffers
TaskSchedulerImpl#resourceOffers 响应资源调度请求,为每个Task分配资源
DriverActor#launchTasks 将tasks发送到Executor
Executor上执行
ReceiveWithLogging#launchTasks
Executor#launchTask
调度池
调度池顾名思义就是存放了一堆待执行的任务,它决定 TaskSetManager 的调度顺序,然后由 TaskSetManager 根据就近原则来确定 Task 运行在哪个 Executor。
那么它是如何决定 TaskSetManager 的调度顺序的呢? 调度池主要有两个决策策略:FIFO 和 FAIR。
首先以整体看 FIFO 和 FAIR 的执行对比图,可以看出在左侧 FIFO 只有一个调度池,即 rootPool,里面包含了待调度的 TaskSetManager;而右侧 FAIR 在 rootPool 调度池中包含了多个子调度池,比如图中的 production 和 test 调度池。
在 FIFO 算法中需要保证 JobId 比较小的优先执行,如果是同一个 Job 则 StageId 比较小的先被调度。FAIR 算法则提供参数配置,如下图所示是一份配置文件:
接着看看我们的 Spark 集群是如何配置的。
private[spark] trait SchedulingAlgorithm { def comparator(s1: Schedulable, s2: Schedulable): Boolean}private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } res < 0 } }private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0) val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0) val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare = 0 if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false } else if (s1Needy && s2Needy) { compare = minShareRatio1.compareTo(minShareRatio2) } else { compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name } } }
首先获取 S1 和 S2 两个调度池中的运行状态 Task 个数,若 S1 的运行状态 Task 数小于该调度池的最小资源数,而 S2 相反,那么此时优先调度 S1 中的 Task;如果 S1 和 S2 中的运行状态 Task 数都小于该调度池的最小资源数,那么就依据资源占用率决定调度优先级;如果 S1、S2 的运行状态 Task 数都大于所属调度池的最小资源数,那么就对比它们的已运行 task 个数与分配权重的比例,得出来比例较小的优先调度。
Executor
如上图所示,Executor 是在 worker 收到 master 的 LaunchExecutorde 消息后创建的。在 TaskScheduler 阶段提交 Task 之后 Driver 会序列化封装 Task 的依赖文件和自身信息,然后在 Executor 上反序列化得到 Task。在准备好了 Task 的执行环境之后就通过 TaskRunner 去执行计算,得到执行状态。值得注意的是,在得到计算结果发回 Driver 的过程中,如果文件太大会被直接丢弃(可以通过 spark.driver.maxResultSize 来设定大小)。
本文转载自异步社区。
原文链接
https://www.epubit.com/articleDetails?id=Nf2265a66-d3da-457e-bf8f-4967914a3d55
- 点赞
- 收藏
- 关注作者
评论(0)