cjuexuan / mynote

237 stars 34 forks source link

spark那坑爹的event timeline #44

Open cjuexuan opened 7 years ago

cjuexuan commented 7 years ago

spark那坑爹的event timeline

背景

最近在搞spark的性能分析,数据的获取方式无非是spark的listener去获取到spark的event,但是spark webUI的Event timeline看着挺好,其实真的很有欺骗性

比如在下面这个case中,我们的资源是单个executor,单个core,两个task的case,图中两个task居然产生了交集,这就有点影响认知了,所以我就去跟踪了下完整的流程

eventTimeLine

数据的最终来源

这边大致抽象spark在提交过程中TaskInfo和TaskMetric的创建链路,首先找到TaskInfo的创建代码

  1. TaskSetManager.resourceOffer,这个方法会调用到dequeueTask这个方法,如果有找到task,那么就会创建TaskInfo,此时这个Task的launchTime就已经确定了
  2. 我们顺着这个方法找到Usage,最终找到了driver端的DriverEndpoint这个类的receiveAndReply方法,周期性处理ReviveOffers这个消息,最终调用到makeOffer方法
  3. makeOffer里调用到launchTask,进行序列化,这部分是有些开销,在这个方法的最后将调用executorEndpoint,通过rpc请求将task发送到executor中去
  4. executor中调用launchTask,将TaskRunner放到threadPool中去TaskRunner继承了Runnable接口,所以我们关注于该类的run方法
  5. 在进入TaskRunner的run方法中,首先标记了反序列化开始(deserializeStartTime)的时间戳,接着在任务开始时,设置了task开始时间戳(taskStart),在task的run的过程中,设置了executorDeserializeTime(这里已经是时长了),task结束,设置了task结束时间戳(taskEnd)
  6. spark将taskFinish - taskStart-task.executorDeserializeTime(时长)看作是executorRuntime,将taskStart-deserializeStartTime +task.executorDeserializeTime(时长)看作总的反序列化时间
  7. 在执行完task之后,需要进行一次序列化,这里打了两个时间戳(beforeSerialization和afterSerialization),差值作为结果序列化时间(resultSerializationTime)
  8. 接着调用ExecutorBackend.statusUpdate,将结果(StatusUpdate)发回给driver
  9. 我们再将目光切回driver,在CoarseGrainedSchedulerBackend用scheduler.statusUpdate处理发回来的StatusUpdate,结束之后这里会将exeutor标记Free并且makeOffer一个新的任务
  10. scheduler.statusUpdate如果任务结束,则会先清空Task的状态(cleanupTaskState),如果成功,会走到taskResultGetter.enqueueSuccessfulTask,这个方法会判断发回来的数据是否可以直接反序列化,还是通过blockManager进行获取,如果走blockManager,那么还有一个gettingResult的时间,这里最终走到scheduler.handleSuccessfulTask
  11. scheduler.handleSuccessfulTask走到taskSetManager.handleSuccessfulTask,在第一行将taskInfo标记成finish状态,此时产生了一个task的最终结束时间
  12. 通知各种listener,task已经结束了

webUI中的数据

刚才说那个数据不准,那么到底哪里不准呢,首先,任务开始是一个时间戳,任务结束是一个时间戳,由于各个环节有严格时序,比如首先进行的是任务序列化,rpc到executor中,接着executor中进行反序列化,计算,结果序列化,发回driver,driver判断需要走blockManager取数据,则产生获取结果的时间,标记最终结束,真正在executor上的部分是executor反序列化,计算,结果序列化,这些最终变成了TaskMetric中的时长,所以spark在webUi中展示的时候,将数据从尾端放,先放gettingResult时长(如果有),再放结果序列化时长,再放计算时长,再放executor反序列化时长,那么还和task开始有一截,这部分就给了schedulerDelay,具体代码在StagePage中

  private[ui] def getSchedulerDelay(
      info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = {
    if (info.finished) {
      val totalExecutionTime = info.finishTime - info.launchTime
      val executorOverhead = metrics.executorDeserializeTime +
        metrics.resultSerializationTime
      math.max(
        0,
        totalExecutionTime - metrics.executorRunTime - executorOverhead -
          getGettingResultTime(info, currentTime))
    } else {
      // The task is still running and the metrics like executorRunTime are not available.
      0L
    }
  }

实验

实验一

首先开启yarn-client 的debug模式,将断点打在

debug1

进入断点之后,等个10s,释放第一个task,再过10s,释放第二个task,进入webUI

timeline1

我们发现两个task其实是在markFinish的过程中被阻塞,但时间都算到前面去了,由于我们是1个executor1个core,task是两个,所以在检查资源的过程中第二个task由于拿不到资源所以迟迟不开始

实验二

我们将断点打到TaskResultGetter.enqueueSuccessfulTask,这样相当于卡住任务的结束,但TaskInfo会正常的new出第二个Task

debug2

在卡住这个断点10s后,禁用所有断点,并且释放两个断点,这样,第一个task就和第二个task几乎同时结束(原谅我的手速)

timeline2

这是因为我们卡住了两个任务的结束时间,但开始时间其实已经产生了,所以才会有这么大的重叠

实验三

由于产生taskInfo和markFinish是在两个线程,我们避免在外层有锁的地方打断点,所以将断点打到TaskResultGetter.enqueueSuccessfulTask和TaskSetManager.resourceOffer上,注意Suspend设置成Thread级别的,我们放掉第一个Task的创建,卡住第一个Task的结束,并且卡一会第二个Task的创建,最终同时释放所有的断点

debug3

最终发现这两个Task结束时间几乎一致

总结

在监控和使用数据的过程中,遇到时长的,千万别当真:(

klion26 commented 7 years ago

spark将taskStart-taskEnd-task.executorDeserializeTime(时长)看作是executorRuntime,将taskStart-deserializeStartTime +task.executorDeserializeTime(时长)看作总的反序列化时间

这里是 taskEnd - taskStart?

cjuexuan commented 7 years ago

@klion26 是的,有些笔误,具体的代码在TaskRunner的run方法中,感谢指出