def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
}
}
schedulableBuilder.buildPools()
}
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
case _ =>
val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
throw new IllegalArgumentException(msg)
}
}
前言
spark应用程序的调度体现在两个地方,第一个是Yarn对spark应用间的调度,第二个是spark应用内(同一个SparkContext)的多个TaskSetManager的调度,这里暂时只对应用内部调度进行分析。
spark的调度模式分为两种:FIFO(先进先出)和FAIR(公平调度)。默认是FIFO,即谁先提交谁先执行,而FAIR支持在调度池中再进行分组,可以有不同的权重,根据权重、资源等来决定谁先执行。spark的调度模式可以通过spark.scheduler.mode进行设置。
调度池初始化
在DAGScheluer对job划分好stage并以TaskSet的形式提交给TaskScheduler后,TaskScheduler的实现类会为每个TaskSet创建一个TaskSetMagager对象,并将该对象添加到调度池中:
schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。
可以看到程序会根据配置来创建不同的调度池,schedulableBuilder有两种实现,分别是FIFOSchedulableBuilder和FairSchedulableBuilder,接着后面调用了schedulableBuilder.buildPools(),我们来看两者都是怎么实现的。
FIFOSchedulableBuilder啥也没干。
可以看到FairSchedulableBuilder的buildPools方法中会先去读取FAIR模式的配置文件默认位于SPARK_HOME/conf/fairscheduler.xml,也可以通过参数spark.scheduler.allocation.file设置用户自定义配置文件。 模板如下:
其中:
FAIR可以配置多个调度池,即rootPool里面还是一组Pool,Pool中包含了TaskSetMagager。 FairSchedulableBuilder会根据配置文件创建buildFairSchedulerPool。
根据每个字段值(未设置则为默认值)来实例化一个Pool对象,并添加到rootPool中。
一个spark应用程序包含一个TaskScheduler,一个TaskScheduler包含一个唯一的RootPool,FIFO只有一层Pool,包含TaskSetMagager,而FARI包含两层Pool,RootPool包含子Pool,子Pool包含TaskSetMagager,RootPool都是在实例化SchedulableBuilder的时候创建的。
若根据配置文件创建的调度池中没有一个名字为default的调度池,则会创建一个所有参数都是默认值的名字为default的调度池。
调度池添加TaskSetMagager
两种调度模式的最终实现都是一样,不过FAIR会在添加之前会获取需要使用的调度池,默认为名字为default的调度池。
添加一个TaskSetMagager的时候会添加到队列的尾部,获取是从头部获取。对于FIFO而言,parentPool都是RootPool,而FAIR,TaskSetMagager的parentPool都是RootPool的子Pool。
调度池对TaskSetMagager排序算法
TaskScheduler通过SchedulerBackend拿到的executor资源后,会对所有TaskSetMagager进行调度。通过rootPool.getSortedTaskSetQueue来获取排序后的TaskSetMagager。
可见排序核心的算法在taskSetSchedulingAlgorithm.comparator里,而两种模式的taskSetSchedulingAlgorithm对应的实现也不一样:
FIFO模式的算法类是FIFOSchedulingAlgorithm,FAIR模式的算法实现类是FairSchedulingAlgorithm。下面看两种模式下的比较函数的实现,FIFO:
下面看FAIR的排序算法:
在FAIR模式中,需要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质,都是用的是FairSchedulingAlgorithm.FairSchedulingAlgorithm算法。