apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

PySpark Pi OOM #515

Open duyanghao opened 7 years ago

duyanghao commented 7 years ago

pi.py:

from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

when i run spark pi using pyspark with par 100000, driver aborts as below:

Exception in thread "main" org.apache.spark.SparkUserAppException: User application exited with 137
        at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:103)
        at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)

And driver docker aborts with OOM(137) as below(dmesg_all):

[6928402.630093] python invoked oom-killer: gfp_mask=0xd0, order=0, oom_score_adj=841
...
[6928402.632421] Task in /docker/5bfb3307ae243b4d8f883ff0248edb7ebdf8c5e68c9058759f2fbaa79d377729 killed as a result of limit of /docker/5bfb3307ae243b4d8f883ff0248edb7ebdf8c5e68c9058759f2fbaa79d377729
[6928402.632422] memory: usage 23068672kB, limit 23068672kB, failcnt 34577
...
[6928402.632632] Memory cgroup out of memory: Kill process 25461 (python) score 1748 or sacrifice child
[6928402.632633] Killed process 24679 (python) total-vm:78213156kB, anon-rss:22601784kB, file-rss:2752kB

Addition: both driver and executor are allocated with 2cores and 20G memory as below:

--conf spark.driver.memory=20480m \
--conf spark.driver.cores=2 \
--conf spark.executor.instances=2 \
--conf spark.executor.memory=20480m \
--conf spark.executor.cores=2 \

SparkPi.scala :

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples

import scala.math.random

import org.apache.spark.sql.SparkSession

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("Spark Pi")
      .getOrCreate()
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / (n - 1))
    spark.stop()
  }
}
// scalastyle:on println

But i can successfully run spark pi using scala jar with the same par 100000, and the resource allocated is much less than that of pyspark as below:

--conf spark.driver.memory=1024m \
--conf spark.driver.cores=2 \
--conf spark.executor.instances=2 \
--conf spark.executor.memory=1024m \
--conf spark.executor.cores=2 \

I am not familiar with pyspark, but will pyspark incur a substantial performance(especially in memory) overhead?

mccheah commented 7 years ago

Pyspark has more memory overhead than Spark in Java or Scala because Python objects are stored off the JVM's heap. One probably has to increase the executor and driver memory overhead so that the pods have enough RAM for these objects.

ifilonenko commented 7 years ago

@duyanghao If memory-overhead is not properly set, the JVM will eat up all the memory and not allocate enough of it for PySpark to run. This problem is solved via increasing driver and executor memory overhead. I would recommend to look at this talk which elaborates on reasons for PySpark having OOM issues. For, I believe, this is more of a Spark Core tuning, regardless of resource manager.

ifilonenko commented 7 years ago

@duyanghao status on this issue?

duyanghao commented 7 years ago

@ifilonenko @mccheah i try to increase overhead as you said but still failed as below:

  1. submit with memoryOverhead :

    --conf spark.driver.memory=1024m \
    --conf spark.driver.cores=2 \
    --conf spark.executor.instances=2 \
    --conf spark.executor.memory=1024m \
    --conf spark.executor.cores=2 \
    --conf spark.kubernetes.driver.memoryOverhead=10240m \
    --conf spark.kubernetes.executor.memoryOverhead=10240m \
    ...
  2. driver and executor pod resource:

    Limits:
      cpu:      1
      memory:   11Gi
    Requests:
      cpu:              1
      memory:           1Gi
  3. driver docker aborts with OOM(137) as below(dmesg_all):

    Exception in thread "main" org.apache.spark.SparkUserAppException: User application exited with 137
    at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:103)
    at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
[6928402.630093] python invoked oom-killer: gfp_mask=0xd0, order=0, oom_score_adj=841
...
[61086.851530] Task in /docker/73792fab2937c40a01556757fcd1381a871b97d4c9402e7c3320802a91d1ef04 killed as a result of limit of /docker/
73792fab2937c40a01556757fcd1381a871b97d4c9402e7c3320802a91d1ef04
[61086.851531] memory: usage 11534336kB, limit 11534336kB, failcnt 361261
...
[61086.851599] Memory cgroup out of memory: Kill process 29052 (python) score 1920 or sacrifice child
[61086.851600] Killed process 28675 (python) total-vm:78213124kB, anon-rss:11494652kB, file-rss:2768kB
tnachen commented 7 years ago

Not sure I follow correctly, but in your first example didn't you set it 20gb instead of your last try which is 11gb?

duyanghao commented 7 years ago

@tnachen Here is 20GB test:

  1. submit with memoryOverhead :
    --conf spark.driver.memory=1024m \
    --conf spark.driver.cores=2 \
    --conf spark.executor.instances=2 \
    --conf spark.executor.memory=1024m \
    --conf spark.executor.cores=2 \
    --conf spark.kubernetes.driver.memoryOverhead=20480m \
    --conf spark.kubernetes.executor.memoryOverhead=20480m \
    ...
  2. driver and executor pod resource:
    Limits:
      cpu:      1
      memory:   21Gi
    Requests:
      cpu:              1
      memory:           1Gi
  3. driver docker aborts with OOM(137) as below(dmesg_all):
    Exception in thread "main" org.apache.spark.SparkUserAppException: User application exited with 137
    at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:103)
    at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
[166152.253682] python invoked oom-killer: gfp_mask=0xd0, order=0, oom_score_adj=985
...
[166152.253782] Task in /docker/de2e07bd8155692babd2ea6fb2e37b62ce2bcaeace2d463f803430c16eb80b40 killed as a result of limit of /docker
/de2e07bd8155692babd2ea6fb2e37b62ce2bcaeace2d463f803430c16eb80b40
[166152.253785] memory: usage 22020096kB, limit 22020096kB, failcnt 121663
...
[166152.253887] Memory cgroup out of memory: Kill process 17989 (python) score 1910 or sacrifice child
[166152.253912] Killed process 17612 (python) total-vm:78213132kB, anon-rss:21991236kB, file-rss:2768kB
duyanghao commented 7 years ago

@mccheah @ifilonenko @tnachen Any suggestions for this problem?

holdenk commented 6 years ago

So one thing which I wish we had done in Spark YARN is bumping up the overhead automatically when we are running Python code (still requires tuning for some cases but reasonable defaults ftw). What do you think @ifilonenko? (I have a goal of being able to take that part of my standard Python Spark talks in a year, but that might be unwarranted optimism :p :)).

ifilonenko commented 6 years ago

@holdenk That definitely makes sense considering how much memory the JVM eats up. What would be a reasonable "bumped-up" over-head amount. Should this be calculated based on cluster-configurations, user-passed in configs, or just hard-coded values that we set.

duyanghao commented 6 years ago

@holdenk @ifilonenko what do you guys think about the above python oom problem?