microsoft / SynapseML

Simple and Distributed Machine Learning
http://aka.ms/spark
MIT License
5.05k stars 830 forks source link

Lightgbm - mysterious OOM problems #1124

Closed trillville closed 3 years ago

trillville commented 3 years ago

I am consistently getting errors like this at the reduce step while trying to train a lightgbm model:

org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(7, 0) finished unsuccessfully.
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container from a bad node: container_1626376382032_0004_01_000002 on host: spark-ml-pipeline2-w-1.c.xxx.internal. Exit status: 137. Diagnostics: [2021-07-15 19:41:35.679]Container killed on request. Exit code is 137
[2021-07-15 19:41:35.680]Container exited with a non-zero exit code 137.

dataset rows: 208,840,700 dataset features: 110 size: ~150GB

training code/params:

spark = SparkSession \
        .builder \
        .appName(f"{args['model_type']}-model-train") \
        .getOrCreate()

train = spark.read.parquet(f"gs://blahblah").select(*["id", "dt", "features", "label"])
test = spark.read.parquet(f"gs://blahblah").select(*["id", "dt", "features", "label"])

model = LightGBMClassifier(
    labelCol="label",
    objective="binary",
    maxDepth=8,
    numLeaves=70,
    learningRate=0.04,
    featureFraction=0.8,
    lambdaL1=3.0,
    lambdaL2=3.0,
    posBaggingFraction=1.0,
    negBaggingFraction=0.5,
    baggingFreq=10,
    numIterations=200,
    maxBin=63,
    useBarrierExecutionMode=True,
)

trained = model.fit(train)
results = trained.transform(test)

cluster config: 3x n2-highmem-16 workers (16 vcpus + 128 memory each)

spark:spark.driver.maxResultSize
1920m
spark:spark.driver.memory
3840m
spark:spark.dynamicAllocation.enabled
false
spark:spark.executor.cores
8
spark:spark.executor.instances
2
spark:spark.executor.memory
57215m
spark:spark.executorEnv.OPENBLAS_NUM_THREADS
1
spark:spark.jars.packages
com.microsoft.ml.spark:mmlspark:1.0.0-rc3-148-87ec5f74-SNAPSHOT
spark:spark.jars.repositories
https://mmlspark.azureedge.net/maven
spark:spark.scheduler.mode
FAIR
spark:spark.shuffle.service.enabled
false
spark:spark.sql.cbo.enabled
true
spark:spark.ui.port
0
spark:spark.yarn.am.memory
640m
yarn-env:YARN_NODEMANAGER_HEAPSIZE
4000
yarn-env:YARN_RESOURCEMANAGER_HEAPSIZE
3840
yarn-env:YARN_TIMELINESERVER_HEAPSIZE
3840
yarn:yarn.nodemanager.address
0.0.0.0:8026
yarn:yarn.nodemanager.resource.cpu-vcores
16
yarn:yarn.nodemanager.resource.memory-mb
125872
yarn:yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs
86400
yarn:yarn.scheduler.maximum-allocation-mb
125872
yarn:yarn.scheduler.minimum-allocation-mb
1

Stacktrace

py4j.protocol.Py4JJavaError: An error occurred while calling o82.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(7, 0) finished unsuccessfully.
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container from a bad node: container_1626376382032_0004_01_000002 on host: spark-ml-pipeline2-w-1.c.xxx. Exit status: 137. Diagnostics: [2021-07-15 19:41:35.679]Container killed on request. Exit code is 137
[2021-07-15 19:41:35.680]Container exited with a non-zero exit code 137.
[2021-07-15 19:41:35.680]Killed by external signal
.
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2259)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2208)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2207)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1968)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2443)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2388)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2377)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2297)
    at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1120)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1102)
    at com.microsoft.ml.spark.lightgbm.LightGBMBase.innerTrain(LightGBMBase.scala:481)
    at com.microsoft.ml.spark.lightgbm.LightGBMBase.innerTrain$(LightGBMBase.scala:440)
    at com.microsoft.ml.spark.lightgbm.LightGBMClassifier.innerTrain(LightGBMClassifier.scala:26)
    at com.microsoft.ml.spark.lightgbm.LightGBMBase.$anonfun$train$1(LightGBMBase.scala:63)
    at com.microsoft.ml.spark.logging.BasicLogging.logVerb(BasicLogging.scala:63)
    at com.microsoft.ml.spark.logging.BasicLogging.logVerb$(BasicLogging.scala:60)
    at com.microsoft.ml.spark.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:26)
    at com.microsoft.ml.spark.logging.BasicLogging.logTrain(BasicLogging.scala:49)
    at com.microsoft.ml.spark.logging.BasicLogging.logTrain$(BasicLogging.scala:48)
    at com.microsoft.ml.spark.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:26)
    at com.microsoft.ml.spark.lightgbm.LightGBMBase.train(LightGBMBase.scala:44)
    at com.microsoft.ml.spark.lightgbm.LightGBMBase.train$(LightGBMBase.scala:43)
    at com.microsoft.ml.spark.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:26)
    at com.microsoft.ml.spark.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:26)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

When I do: train = train.sample(withReplacement=False, fraction=0.25) the job ran successfully. I'm kinda guessing that I could fix it by throwing more resources at the problem, but I would think my current cluster should be totally overkill given the dataset size.

So far I've tried:

I am on spark 3.0 and using com.microsoft.ml.spark:mmlspark:1.0.0-rc3-148-87ec5f74-SNAPSHOT

Thank you!

welcome[bot] commented 3 years ago

👋 Thanks for opening your first issue here! If you're reporting a 🐞 bug, please make sure you include steps to reproduce it.

TomFinley commented 3 years ago

Hi @trillville, sorry to hear of your problems. I see you're using the latest branch, but have also tried other recent branches, so perhaps it is not the result of a recent change. Just a few followup questions:

It appears pretty clear from the message this OOM exit code 137 appears within a node. Unfortunately on the JVM side, the execution is fairly monolithic, with most logic happening inside the native C++ implementation of LightGBM. Aside from the stack trace, were there any output logs that might indicate what the native code was doing? I'd be surprised if it was anything other than the dataset preparation step as it converts the data from the JVM into the native format, but in the end I'm just guessing. In the remainder I'm going to work though on the assumption that the guess is correct.

So just for laughs, would you mind repartitioning to 16 explicitly yourself, just to see if it's a simple dataset task imbalance issue? The reason I think this might be important is, the intermediate memory buffers constructed on the Scala side before passing into native code can get pretty large. The current Spark wrapper of LightGBM does not construct a LightGBM dataset in a streaming fashion. That said, I might expect 128 GB per task to easily be enough. But I wonder then, is the input data partitioned so it is evenly spread among those 16 workers? The wrapper does contain some repartition logic, but it is somewhat conservative w.r.t. what it attempts, only ever reducing the number of partitions. (As seen here.)

https://github.com/Azure/mmlspark/blob/87ec5f7442e2fca4003c952d191d0ea5f7d61eac/lightgbm/src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala#L135-L144

Failing that, another thing that comes to midn is there was one relatively recent PR #1066 that you then have access to, that is meant in some cases to change how the datasets are prepared, that in some cases reduces the amount of memory used by the nodes. It is the useSingleDatasetMode parameter, by default false -- could we try turning it to true? However I might expect that to help if the memory problems were due to something other than the LightGBM's internal dataset preparation.

imatiach-msft commented 3 years ago

@trillville yes, could you please try setting the new parameter: useSingleDatasetMode=True to see if it helps? Also if I understand correctly: 208840700 rows 110 cols 8 bytes per col = 183779816000 ~= 183 GB in memory however it seems like your cluster should still be able to handle this

trillville commented 3 years ago

Thank you both for the suggestions! In my case useSingleDatasetMode=True actually led to successful model training. I think you were right that the dataset was actually right up against the limit that the cluster could handle. I don't think the input data was imbalanced in this case, and repartitioning alone did nothing.

trillville commented 3 years ago

thanks again - everything is working for me :)