microsoft / SynapseML

Simple and Distributed Machine Learning
http://aka.ms/spark
MIT License
5.07k stars 832 forks source link

java.lang.NegativeArraySizeException while trainning #1338

Open shuDaoNan9 opened 2 years ago

shuDaoNan9 commented 2 years ago

If I do not set numBatches, there will be ‘NegativeArraySizeException’ or ‘OOM’ during trainning big dataset (about 26320507 rows), and the cpu utilization will be less than 90%. But if I set numBatches, it will take more time to train the model, although the cpu utilization will be about 97% during each batch.

submit: spark-submit --master yarn --driver-cores 2 --driver-memory 5G --num-executors 1 --executor-memory 50G --executor-cores 47 --conf spark.driver.maxResultSize=3G --conf spark.yarn.heterogeneousExecutors.enabled=false --conf spark.dynamicAllocation.enabled=false --jars ......

code:

......
val classifier = new LightGBMClassifier()
      .setLabelCol("play")
      .setObjective("binary")
      .setCategoricalSlotNames(Array("countrycode_index","itemID_index","uid_index"))
      .setFeaturesCol("gbdtFeature")
      .setPredictionCol("predictPlay")
      .setNumIterations(trees) 
      .setNumLeaves(32)
      .setLearningRate(0.002)
      .setProbabilityCol("probabilitys")
      .setEarlyStoppingRound(200).setBoostingType("gbdt").setLambdaL2(0.002).setMaxDepth(24)
      .setNumBatches(10) // If I do not set numBatches, there will be ‘NegativeArraySizeException’ or ‘OOM’ during trainning big dataset (about 26320507 rows). But if I set numBatches, it will cost more time to train the model.
      .setNumThreads(47)
      .setUseSingleDatasetMode(true)
......

NegativeArraySizeException log:

......
[LightGBM] [Warning] Met categorical feature which contains sparse values. Consider renumbering to consecutive integers started from zero
22/01/10 05:58:09 ERROR Executor: Exception in task 33.0 in stage 7.0 (TID 142)
java.lang.NegativeArraySizeException
    at com.microsoft.ml.lightgbm.lightgbmlibJNI.LGBM_BoosterSaveModelToStringSWIG(Native Method)
    at com.microsoft.ml.lightgbm.lightgbmlib.LGBM_BoosterSaveModelToStringSWIG(lightgbmlib.java:309)
    at com.microsoft.azure.synapse.ml.lightgbm.booster.LightGBMBooster.saveToString(LightGBMBooster.scala:273)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.translate(LightGBMBase.scala:320)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainLightGBM(LightGBMBase.scala:374)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$innerTrain$4(LightGBMBase.scala:481)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:195)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
22/01/10 05:58:09 WARN TaskSetManager: Lost task 33.0 in stage 7.0 (TID 142) (ip-172-17-20-30.eu-west-1.compute.internal executor driver): java.lang.NegativeArraySizeException
    at com.microsoft.ml.lightgbm.lightgbmlibJNI.LGBM_BoosterSaveModelToStringSWIG(Native Method)
    at com.microsoft.ml.lightgbm.lightgbmlib.LGBM_BoosterSaveModelToStringSWIG(lightgbmlib.java:309)
    at com.microsoft.azure.synapse.ml.lightgbm.booster.LightGBMBooster.saveToString(LightGBMBooster.scala:273)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.translate(LightGBMBase.scala:320)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainLightGBM(LightGBMBase.scala:374)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$innerTrain$4(LightGBMBase.scala:481)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:195)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

22/01/10 05:58:09 ERROR TaskSetManager: Task 33 in stage 7.0 failed 1 times; aborting job
22/01/10 05:58:09 ERROR LightGBMClassifier: {"uid":"LightGBMClassifier_3b81cbf9ee51","className":"class com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier","method":"train","buildVersion":"0.9.4"}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 7.0 failed 1 times, most recent failure: Lost task 33.0 in stage 7.0 (TID 142) (ip-172-17-20-30.eu-west-1.compute.internal executor driver): java.lang.NegativeArraySizeException
    at com.microsoft.ml.lightgbm.lightgbmlibJNI.LGBM_BoosterSaveModelToStringSWIG(Native Method)
    at com.microsoft.ml.lightgbm.lightgbmlib.LGBM_BoosterSaveModelToStringSWIG(lightgbmlib.java:309)
    at com.microsoft.azure.synapse.ml.lightgbm.booster.LightGBMBooster.saveToString(LightGBMBooster.scala:273)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.translate(LightGBMBase.scala:320)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainLightGBM(LightGBMBase.scala:374)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$innerTrain$4(LightGBMBase.scala:481)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:195)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2418)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2241)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2336)
    at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1120)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1102)
    at org.apache.spark.sql.Dataset.$anonfun$reduce$1(Dataset.scala:1787)
    at org.apache.spark.sql.Dataset$RDDQueryExecution.$anonfun$withNewExecutionId$1(Dataset.scala:3837)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.Dataset$RDDQueryExecution.withNewExecutionId(Dataset.scala:3835)
    at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3742)
    at org.apache.spark.sql.Dataset.reduce(Dataset.scala:1787)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.innerTrain(LightGBMBase.scala:486)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.innerTrain$(LightGBMBase.scala:443)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.innerTrain(LightGBMClassifier.scala:26)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$1(LightGBMBase.scala:63)
    at com.microsoft.azure.synapse.ml.logging.BasicLogging.logVerb(BasicLogging.scala:63)
    at com.microsoft.azure.synapse.ml.logging.BasicLogging.logVerb$(BasicLogging.scala:60)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:26)
    at com.microsoft.azure.synapse.ml.logging.BasicLogging.logTrain(BasicLogging.scala:49)
    at com.microsoft.azure.synapse.ml.logging.BasicLogging.logTrain$(BasicLogging.scala:48)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:26)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:44)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:43)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:26)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:26)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
    at myPackage$.lgbmTrain(GbdtForCol1.scala:201)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:959)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1047)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1056)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NegativeArraySizeException
    at com.microsoft.ml.lightgbm.lightgbmlibJNI.LGBM_BoosterSaveModelToStringSWIG(Native Method)
    at com.microsoft.ml.lightgbm.lightgbmlib.LGBM_BoosterSaveModelToStringSWIG(lightgbmlib.java:309)
    at com.microsoft.azure.synapse.ml.lightgbm.booster.LightGBMBooster.saveToString(LightGBMBooster.scala:273)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.translate(LightGBMBase.scala:320)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainLightGBM(LightGBMBase.scala:374)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$innerTrain$4(LightGBMBase.scala:481)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:195)
......

Thank you!

AB#1884827

imatiach-msft commented 2 years ago

@JWenBin I would recommend to try to avoid numBatches if possible. All it does is split the dataset into multiple batches, train the model on the first batch, then retrain it on the second batch, etc. It will be much less likely to run out of memory, as you increase the number of batches, but it will also likely decrease model accuracy and increase training time. It looks like you have 26320507 rows - how many columns are there? What is your cluster size - the number of nodes, CPUs per machine and RAM per machine? Based on this submission command you posted: --num-executors 1 --executor-memory 50G --executor-cores 47 It looks like you are using a single executor with 50GB RAM allocated to it and 47 cores to run distributed training: https://community.cloudera.com/t5/Support-Questions/Spark-num-executors-setting/td-p/149434 The --num-executors is the total number of executors, did you intend to run all training on just a single machine/executor?

shuDaoNan9 commented 2 years ago

@imatiach-msft Thank you for your Reply! In this case, my dataset have 26320507 rows 38 columns. And I used a single machine(48 vCore, 96 GiB memory) for test, because I find it has faster training speed than multiple machines/executors in my case. It looks like 'java.lang.NegativeArraySizeException' only occurs in this test case. In fact, In our production environment the data set are more than 60m rows 39 columns. Everything works well then. It takes about 20 minutes to training it on a single machine/executor(96 vCore, 192 GiB memory) in the same method. I tried use 3 machine/executor(36 vCore, 72 GiB memory) instead before, however, it took me more time to train the model and cost more money.

My multiple machines/executors case code:

 val classifier = new LightGBMClassifier()
      .setLabelCol("click")
      .setObjective("binary")
      .setCategoricalSlotNames(Array("ua_index","uid_index",......))
      .setUseBarrierExecutionMode(true)
      .setFeaturesCol("gbdtFeature")
      .setPredictionCol("predictPlay")
      .setNumIterations(5000) 
      .setNumLeaves(32)
      .setLearningRate(0.002)
      .setProbabilityCol("probabilitys")
      .setEarlyStoppingRound(200).setBoostingType("gbdt").setLambdaL2(0.002).setMaxDepth(24)
    val lgbmModel =classifier.fit(trainDF.repartition(numOfMachines)) // 'repartition(numOfMachines)' may means lower network communication.

My multiple machines/executors case submit information: spark-submit --master yarn --driver-cores 2 --driver-memory 5G --num-executors 3 --executor-memory 50G --executor-cores 35 --conf spark.driver.maxResultSize=3G --conf spark.yarn.heterogeneousExecutors.enabled=false --conf spark.dynamicAllocation.enabled=false --jars ......

My multiple machines/executors case cluster information: 36 vCore, 72 GiB memory * 3 nodes. Spark 3.1.2

Thank you very much!

imatiach-msft commented 2 years ago

hi @JWenBin I wonder if there is some sort of overflow happening here in this function when you hit the "NegativeArraySizeException": https://github.com/microsoft/LightGBM/blob/master/swig/lightgbmlib.i#L37

It looks like your dataset is ~8GB of size (26320507 rows 38 columns 8 bytes per value), but I don't think it should run OOM. We do create a second copy of the dataset in memory when passing it to lightgbm currently, but that would still only be ~16 GB, and even with spark taking up quite a bit more memory it should still be quite below 50GB I think.

It would be interesting to add some debugging messages around that function to see what value specifically is negative. Perhaps it's this line (the buffer_len variable?):

char* dst = new char[buffer_len];

or something else. I can't quite tell since the stack trace doesn't give the exact line's code or line number. I just see that it's coming from that function "LGBM_BoosterSaveModelToStringSWIG":

Caused by: java.lang.NegativeArraySizeException
    at com.microsoft.ml.lightgbm.lightgbmlibJNI.LGBM_BoosterSaveModelToStringSWIG(Native Method)

Which isn't actually doing anything with the dataset but just saving the model.

kyoungrok0517 commented 1 year ago

Sorry to bother with an year-old issue, but did you find the solution (or reason)? I'm experiencing the same issue. Strange thing is it only happens when I set the high boosting rounds (e.g. 7,000), but no issue happens with low boosting rounds e.g. 2000.