def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) {
sc.get.cancelStage(stageId)
}
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
}
}
/**
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
* for more information.
*/
def cancelJobGroup(groupId: String) {
assertNotStopped()
dagScheduler.cancelJobGroup(groupId)
}
/** Cancel all jobs that have been scheduled or are running. */
def cancelAllJobs() {
assertNotStopped()
dagScheduler.cancelAllJobs()
}
/** Cancel a given job if it's scheduled or running */
private[spark] def cancelJob(jobId: Int) {
dagScheduler.cancelJob(jobId)
}
/** Cancel a given stage and all jobs associated with it */
private[spark] def cancelStage(stageId: Int) {
dagScheduler.cancelStage(stageId)
}
/**
* Cancel a job that is running or waiting in the queue.
*/
def cancelJob(jobId: Int): Unit = {
logInfo("Asked to cancel job " + jobId)
eventProcessLoop.post(JobCancelled(jobId))
}
/**
* Cancel all jobs in the given job group ID.
*/
def cancelJobGroup(groupId: String): Unit = {
logInfo("Asked to cancel job group " + groupId)
eventProcessLoop.post(JobGroupCancelled(groupId))
}
/**
* Cancel all jobs that are running or waiting in the queue.
*/
def cancelAllJobs(): Unit = {
eventProcessLoop.post(AllJobsCancelled)
}
private[scheduler] def handleStageCancellation(stageId: Int) {
stageIdToStage.get(stageId) match {
case Some(stage) =>
val jobsThatUseStage: Array[Int] = stage.jobIds.toArray
jobsThatUseStage.foreach { jobId =>
handleJobCancellation(jobId, s"because Stage $stageId was cancelled")
}
case None =>
logInfo("No active jobs to kill for Stage " + stageId)
}
submitWaitingStages()
}
/** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
private def failJobAndIndependentStages(
job: ActiveJob,
failureReason: String,
exception: Option[Throwable] = None): Unit = {
val error = new SparkException(failureReason, exception.getOrElse(null))
var ableToCancelStages = true
val shouldInterruptThread =
if (job.properties == null) false
else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
// Cancel all independent, running stages.
val stages = jobIdToStageIds(job.jobId)
if (stages.isEmpty) {
logError("No stages registered for job " + job.jobId)
}
stages.foreach { stageId =>
val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds)
if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) {
logError(
"Job %d not registered for stage %d even though that stage was registered for the job"
.format(job.jobId, stageId))
} else if (jobsForStage.get.size == 1) {
if (!stageIdToStage.contains(stageId)) {
logError(s"Missing Stage for stage with id $stageId")
} else {
// This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)//关键调用
markStageAsFinished(stage, Some(failureReason))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
ableToCancelStages = false
}
}
}
}
}
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
attempts.foreach { case (_, tsm) =>
// There are two possible cases here:
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task and then abort
// the stage.
// 2. The task set manager has been created but no tasks has been scheduled. In this case,
// simply abort the stage.
tsm.runningTasksSet.foreach { tid =>
val execId = taskIdToExecutorId(tid)
backend.killTask(tid, execId, interruptThread)
}
tsm.abort("Stage %s cancelled".format(stageId))
logInfo("Stage %d was cancelled".format(stageId))
}
}
}
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
reviveOffers()
case StatusUpdate(taskId, state, serializedData) =>
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
freeCores += scheduler.CPUS_PER_TASK
reviveOffers()
}
case KillTask(taskId, interruptThread) =>
executor.killTask(taskId, interruptThread)
}
cluster 模式下
先是Dirver处理,处理逻辑在DriverEndpoint 中,
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
case ReviveOffers =>
makeOffers()
case KillTask(taskId, executorId, interruptThread) =>
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
}
然后executor也要处理下
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread)
}
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}
/**
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
* different value or cleared.
*
* Often, a unit of execution in an application consists of multiple Spark actions or jobs.
* Application programmers can use this method to group all those jobs together and give a
* group description. Once set, the Spark web UI will associate such jobs with this group.
*
* The application can also use [[org.apache.spark.SparkContext.cancelJobGroup]] to cancel all
* running jobs in this group. For example,
* {{{
* // In the main thread:
* sc.setJobGroup("some_job_to_cancel", "some job description")
* sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
*
* // In a separate thread:
* sc.cancelJobGroup("some_job_to_cancel")
* }}}
*
* If interruptOnCancel is set to true for the job group, then job cancellation will result
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
*/
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
// Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
// changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
// APIs to also take advantage of this property (e.g., internal job failures or canceling from
// JobProgressTab UI) on a per-job basis.
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
}
spark 源码学习之job cancel 与job desc
spark 页面中的kill按钮在跑一些大的任务想去cancel任务的时候还是蛮有用的,最近我的项目中想集成这部分功能,就去阅读了下相关源码,下面记录下spark 的stage cancel和job cancel以及如何命名job的源码阅读部分
页面
首先我们从页面进去 ,页面大概长这样
找到相关源码,在
org.apache.spark.ui.jobs.StagesTab
中的kill 方法sparkContext
进入
org.apache.spark.SparkContext
,和cancel相关的代码为: 其中cancelJobGroup
是我们后面要重点用到的,不过此时我们还是跟着cancelStage这条线DagScheduler
接着我们会发现调用的方法为
org.apache.spark.scheduler.DagScheduler
中的cancelStage,所以我们看一下相关实现,这里看到会向eventProccessLop
中发送一个post请求,这里的eventProccessLop
是DAGSchedulerEventProcessLoop
,这个类的构造方法把当前的DagScheduler
传进去了,我们发现在类DAGSchedulerEventProcessLoop
的doOnReceive
方法中处理相关逻辑,这个类也是二传手,他去调用的是DAGScheduler
的handleStageCancellation
方法,传入的是stageId,又调回去了,最终会调用到failJobAndIndependentStages
方法的taskScheduler.cancelTasks(stageId, shouldInterruptThread)
TaskSchedulerImpl
org.apache.spark.scheduler.TaskSchedulerImpl
是org.apache.spark.scheduler.TaskScheduler
子类中唯一实现了cancelTasks
方法的,所以果断进入找相关实现发现该类会往
SchedulerBackend
调用killTask
CoarseGrainedSchedulerBackend
SchedulerBackend
的子类中实现了killTask
的是CoarseGrainedSchedulerBackend
,killTask
往driverEndpoint
发送KillTask
这种msg,最终通过netty传输local 下的处理
LocalEndpoint
中的receive方法会去处理这种请求cluster 模式下
先是Dirver处理,处理逻辑在
DriverEndpoint
中,然后executor也要处理下
其他cancel的逻辑和这基本一致
job desc
cancel的源码基本看差不多了,下面就是我们那个取消的需求了,我们发现其实cancel stage都没开放出来,而他开放的是
cancelJobGroup
,那么我们看相关实现就找到了setJobGroup
和setJobDescription
接着就代码层面设置了这两个值,cancel的时候就调用
cancelJobGroup
最终效果