cjuexuan / mynote

237 stars 34 forks source link

spark on yarn提交后不执行问题分析 #42

Open cjuexuan opened 7 years ago

cjuexuan commented 7 years ago

spark提交后不执行问题分析

现象

今天早上在测试一个spark的job的过程中发现了一个诡异的问题,首先我任务提交之后一直拿不到资源,并且在日志中有如下警告

 WARN (org.apache.spark.scheduler.cluster.YarnScheduler:66) - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

jobs

我发现在executors里面只有driver启动了

executors

首先我怀疑的是资源设置的太大了,于是在env中看了下申请的executor的资源,也不是很大,而且那时候测试集群还有很多空闲资源,所以剩下的思路只能是看那行警告了

image

警告附近的源代码

查看spark源码,发现这行警告是在org.apache.spark.scheduler.TaskSchedulerImplsubmitTasks 这个方法中抛出的,抛出的逻辑是如果hasLaunchedTask为false的情况下,会打出警告,这个变量初始化是false的,唯一改变的方法是resourceOffers

    // submitTasks 方法
     override def submitTasks(taskSet: TaskSet) {
     ...
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
          // 抛出警告
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()

    //resourceOffers
     def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { 
     ...
     if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
    }

本地debug

我们自己实现的一个对spark的提交的封装框架中能很容易的实现对本地debug(yarn-client模式下) 具体细节可以参考spark on yarn一些实践

yarnClient

我们在TaskSchedulerImpl中设置了一些断点,发现executorIdToHost始终为空,和现象是一致的

image

我们打在resourceOffers里面的断点也始终没进去,基本能断定我们executor的参数有问题

resourceOffers

再次审视config

    <bean id="spoorBackup2Hdfs" class="com.ximalaya.data.task.spark.BaseTimeRangeSparkTask">
    <!--<bean id="spoorBackup2Hdfs" class="com.ximalaya.spoor.collect.spark.data.task.SpoorBaseTimeRangeSparkTask">-->
        <property name="executorCores" value="${spark.yarn.executorCores}"/>
        <property name="driver" value="com.ximalaya.spoor.task.driver.SpoorBackupDriver"/>
        <property name="executorMemory" value="${spark.yarn.executorMemory}"/>
<!-- 我们没有配置executorNum,这一行刚才运行的过程中并不存在       <property name="executorNums" value="${spark.yarn.executorNums}"/> -->
        <property name="driverMemory" value="${spark.yarn.driverMemory}"/>
        <property name="statTimeUnit" value="DAY" />
        <property name="preTraceCount" value="0"/>
        <property name="groupName" value="spoor"/>
        </bean>

于是看了下框架默认值

trait BaseSparkTaskSubmitter {
    private val logger = LoggerFactory.getLogger(classOf[BaseSparkTaskSubmitter])

    private var yarnClient: Boolean = false
    //居然是0,看了下commit log,居然是我们故意改的,框架最开始这个默认值是1,这里卖个关子
    private var executorNums: Int = 0
    private var driverMemory: String = "1g"
    private var executorMemory: String = "2g"
    private var executorCores: Int = 1
    ...
    }

改完配置重新验证

我们发现断点也进去了,hashMap也不为空了,程序也成功了

fixed

watch了下hasLaunchedTask,成功变成true

watch

为啥将executorNum的默认值改成0

看提交记录里面,我们是因为dynamicAllocation才改的0,看了下2.2的代码,这也不需要改0,于是我对比了看了下1.5的spark代码,这部分代码在org.apache.spark.util.Utils

2.2vs1.5

看到真相的我眼泪掉下来,默默重新把默认值改成了1,啥都不说了,扎心了

Rayn-liuwei commented 7 years ago

__

cjuexuan commented 7 years ago

@Rayn-liuwei 这是什么意思?

da-liii commented 6 years ago

@cjuexuan 大概就是看参考答案并不是一个好习惯的意思