cjuexuan / mynote

237 stars 34 forks source link

spark 的System.gc分析 #34

Open cjuexuan opened 7 years ago

cjuexuan commented 7 years ago

spark 的System.gc分析

最近发现有的时候driver响应比较慢,所以把gc的log开了一下,还是发现了一些问题的,其中有个full gc的地方比较有趣

gc log

[GC concurrent-root-region-scan-start]
[GC concurrent-root-region-scan-end, 0.1297520 secs]
[GC concurrent-mark-start]
[GC pause (G1 Evacuation Pause) (young) 22G->19G(29G), 0.1507299 secs]
[GC pause (G1 Evacuation Pause) (young) 22G->20G(29G), 0.1308552 secs]
[GC concurrent-mark-end, 7.0112145 secs]
[GC remark, 1.1949356 secs]
[GC cleanup 21G->21G(29G), 0.0191318 secs]
[GC pause (G1 Evacuation Pause) (young) 25G->20G(29G), 0.2027901 secs]
[GC pause (G1 Evacuation Pause) (young) 22G->20G(29G), 0.1766263 secs]
[GC pause (G1 Evacuation Pause) (young) (initial-mark) 23G->20G(29G), 0.1826807 secs]
[GC concurrent-root-region-scan-start]
[GC concurrent-root-region-scan-end, 0.1160003 secs]
[GC concurrent-mark-start]
[Full GC (System.gc())  22G->18G(30G), 51.1216201 secs]
[GC concurrent-mark-abort]

driver在还剩22G的时候触发了一次接近一分钟的full gc,对于一个常驻服务来说,这样不是特别好的,而且触发的方式是System.gc,所以准备分析下什么代码会调用到这么暴力的代码,在我印象里面,通常只有压测之类的,预热的时候会调用一把

相关代码准备

先用idea的find in path找出System.gc这个代码在我工程出现的地方,其中主要关注的是spark中调用的代码,然后我们发现spark中在Benchmark.scala中调用到了,这也正常,在ContextCleaner.scala也调用到了,原来硕鼠在这里

idea

居然起了个线程池周期性的在调用:

  def start(): Unit = {
    cleaningThread.setDaemon(true)
    cleaningThread.setName("Spark Context Cleaner")
    cleaningThread.start()
    periodicGCService.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = System.gc()
    }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
  }

那我们看下调用周期相关的设置

  private val periodicGCInterval =
sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")

那就是driver启动30分钟以后,每30分钟触发一次System.gc,于是我去spark web ui上dump了下线程堆栈,发现这个线程池只会在driver中有,在executor上是没有的

image

分析与总结

首先,ContextCleaner是否被初始化是由spark.cleaner.referenceTracking决定的,而且默认是开启的,这个类将负责比如RDD,Shuffle,Broadcast等的清理动作

代码如下

    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

接下来,由于cleaner被包装成Option,在RDDpersist方法中将调用到,一旦cleaner非None,那么在当第一次将RDD标记成persist的时候,将会在ContextCleaner上注册进去,注册的逻辑也很清晰,向referenceBuffer里加入一个CleanupTaskWeakReference

然后这个类有个死循环的方法keepCleaning,真正执行清理逻辑

那么为什么会周期性的调用System.gc呢,从文档中我们能看出主要是为了触发那些弱引用的对象的垃圾回收,因为被弱引用关联的对象只能生存到下一次gc之前,当垃圾收集器工作时,无论当前的内存是否足够,都会回收掉那些只被弱引用关联的对象,在长时间运行的application中(如果内存很大),这些对象在driver的jvm中只占用了很少的内存,所以很少会发生,甚至不发生gc,本质上clean还是会触发executor端去回收这些对象,如果一直不清除可能会造成executor空间不足

所以出于这个考虑,周期性的调用System.gc来清除那些不再使用的RDD等,最终目的还是通过Rpc调用去clean executor

相关代码:

ContextCleaner.doCleanupRDD :

  def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning RDD " + rddId)
      sc.unpersistRDD(rddId, blocking)
      listeners.asScala.foreach(_.rddCleaned(rddId))
      logInfo("Cleaned RDD " + rddId)
    } catch {
      case e: Exception => logError("Error cleaning RDD " + rddId, e)
    }
  }

SparkContext.unpersistRDD :

  private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
    env.blockManager.master.removeRdd(rddId, blocking)
    persistentRdds.remove(rddId)
    listenerBus.post(SparkListenerUnpersistRDD(rddId))
  }

BlockManagerMaster.removeRdd :

  /** Remove all blocks belonging to the given RDD. */
  def removeRdd(rddId: Int, blocking: Boolean) {
    val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
    future.onFailure {
      case e: Exception =>
        logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
    }(ThreadUtils.sameThread)
    if (blocking) {
      timeout.awaitResult(future)
    }
  }

后记

策略选择还是很纠结的,如果disable了,nio的代码也可能有问题,不过我们服务总是要周期性重启的,所以现在先禁用看两天效果,当然只能是driver的代码设置,executor肯定不能设置,如果后面有oom就调大interval吧