hashicorp / nomad-spark

DEPRECATED: Apache Spark with native support for Nomad as a scheduler
44 stars 16 forks source link

When job with dynamic resource allocations are near to finish many ERROR TaskSchedulerImpl: Lost executor appears #14

Closed tantra35 closed 6 years ago

tantra35 commented 6 years ago

when we use follow spark conf options in spark-submit(i.e. use dynamic allocations)

spark.dynamicAllocation.enabled          true
spark.dynamicAllocation.initialExecutors  2
spark.shuffle.service.enabled            true
spark.executor.instances                  2 

When job near at it finish we see many

ERROR TaskSchedulerImpl: Lost executor

errors. As a result spark begins lost tasks

WARN TaskSetManager: Lost task 371.3 in stage

and sometimes spark-job will fail due lost task after 4 attempts (default for spark) As i understand this happens because at near job finish moment, nomad(through spark) begins shrink spark-executors pool, but spark at this moment still placing task on spark-executors, as result some times may happen that some spark task exceeded all attempts when spark try to place them on killed executors

For now workaround for us is increase spark.task.maxFailures to 20, or doesn't use dynamic resource allocations, due nomad can't shrink job by simple remove allocations by id

tantra35 commented 6 years ago

After some investigations may be setExecutorCount in resource-managers\nomad\src\main\scala\org\apache\spark\scheduler\cluster\nomad\SparkNomadJobController.scala should be rewritten like this

  def setExecutorCount(count: Int): Unit = {
    jobManipulator.updateJob(startIfNotYetRunning = count > 0) { job =>
      val l_tg = SparkNomadJob.find(job, ExecutorTaskGroup).get

      if (l_tg.getCount() < count)
        l_tg.setCount(count)
    }
  }

???

barnardb commented 6 years ago

Thanks for looking into this. This is fixed in the *-20180722 releases.

tantra35 commented 6 years ago

After some inverstigations, we found that setExecutorCount steel have error because no any chance, to set group count to 0, so realy patch would be as follow:

  def setExecutorCount(count: Int): Unit = {
    jobManipulator.updateJob(startIfNotYetRunning = count > 0) { job =>
      val executorGroup = SparkNomadJob.find(job, ExecutorTaskGroup).get
      if (count > 0 && executorGroup.getCount < count) {
        executorGroup.setCount(count)
      } else if (count == 0) {
        executorGroup.setCount(0)
      }
    }
  }
barnardb commented 6 years ago

I see you raised #17 for that. Thanks again. Fix coming.