intel-analytics / analytics-zoo

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray
https://analytics-zoo.readthedocs.io/
Apache License 2.0
18 stars 4 forks source link

NullPointerException in com.intel.analytics.bigdl.models.utils.ModelBroadcastImp.value(ModelBroadcast.scala:166) #9

Closed dzlab closed 2 years ago

dzlab commented 2 years ago

I'm seeing a NullPointerException when trying to train a feed forward model, this is the full stacktrace:

2022-02-08T17:33:10.748+0000 level=INFO thread="pool-36-thread-1" logger=com.intel.analytics.bigdl.optim.DistriOptimizer$ req_id="" user_id=""
Cache thread models...
2022-02-08T17:33:11.165+0000 level=WARN thread="task-result-getter-1" logger=org.apache.spark.scheduler.TaskSetManager req_id="" user_id=""
Lost task 0.0 in stage 16.0 (TID 833) (10.0.10.129 executor 1): java.lang.NullPointerException
    at com.intel.analytics.bigdl.models.utils.ModelBroadcastImp.value(ModelBroadcast.scala:166)
    at com.intel.analytics.bigdl.optim.DistriOptimizer$.$anonfun$initThreadModels$4(DistriOptimizer.scala:612)
    at com.intel.analytics.bigdl.optim.DistriOptimizer$.$anonfun$initThreadModels$4$adapted(DistriOptimizer.scala:611)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.Range.foreach(Range.scala:158)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at com.intel.analytics.bigdl.optim.DistriOptimizer$.$anonfun$initThreadModels$2(DistriOptimizer.scala:611)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Any idea what could be causing this?

qiyuangong commented 2 years ago

Hi @dzlab

Thank you for raising this issue!

According to this warning message, it seems that spark failed to get Broadcast model. This warning usually happens when Spark & BigDL fail to apply enough resource (memory in most case). Pls try to enlarge executor and driver memory. That should address this problem.

BTW, can you share more details about your example or model used if possible ? Also, BigDL or Spark related configuration used in training?

dzlab commented 2 years ago

Hi @qiyuangong, thanks for the hint that definitely make sense cause I was using 1G driver and same for the executor. I was trying a simple feed forward model to train for Iris dataset. The code is almost identical to the one here I just made the model bigger with 3 hidden layers of 50: 100 :50. I will try increasing the memory and let you know here, I think it will address the issue 💯

qiyuangong commented 2 years ago

Hi @qiyuangong, thanks for the hint that definitely make sense cause I was using 1G driver and same for the executor. I was trying a simple feed forward model to train for Iris dataset. The code is almost identical to the one here I just made the model bigger with 3 hidden layers of 50: 100 :50. I will try increasing the memory and let you know here, I think it will address the issue 💯

Nice blog! :)

Let me if there is any further problem.

As a reminder, analytics-zoo will be merged into BigDL (https://github.com/intel-analytics/BigDL) recently. All analytics-zoo features and examples are now available in BigDL. That also means we will only add new features into BigDL rather than analytics-zoo ( will keep bug fix for analytics-zoo if necessary).

dzlab commented 2 years ago

@qiyuangong I'm still seeing the issue after using 2G and then 4G for both driver and executor, looking at the model size its a small one I guess something else is causing the problem

Model Summary:
------------------------------------------------------------------------------------------------------------------------
Layer (type)                            Output Shape              Param #       Connected to                          
========================================================================================================================
Input54f882db (Input)                   (None, 4)                 0                                                   
________________________________________________________________________________________________________________________
Input Layer (Dense)                     (None, 50)                250           Input54f882db                         
________________________________________________________________________________________________________________________
Hidden Layer (Dense)                    (None, 50)                2550          Input Layer                           
________________________________________________________________________________________________________________________
Hidden Layer 2 (Dense)                  (None, 50)                2550          Hidden Layer                          
________________________________________________________________________________________________________________________
Output Layer (Dense)                    (None, 3)                 153           Hidden Layer 2                        
________________________________________________________________________________________________________________________
Total params: 5,503
Trainable params: 5,503
Non-trainable params: 0
------------------------------------------------------------------------------------------------------------------------

I will try giving it more like 8G for the Executor. In your opinion what for such model what is the minimum, the dataset is also a small it is IRIS less than 500 rows and 4 feature columns (double) and 1 string column for the labels

qiyuangong commented 2 years ago

@qiyuangong I'm still seeing the issue after using 2G and then 4G for both driver and executor, looking at the model size its a small one I guess something else is causing the problem

Model Summary:
------------------------------------------------------------------------------------------------------------------------
Layer (type)                            Output Shape              Param #       Connected to                          
========================================================================================================================
Input54f882db (Input)                   (None, 4)                 0                                                   
________________________________________________________________________________________________________________________
Input Layer (Dense)                     (None, 50)                250           Input54f882db                         
________________________________________________________________________________________________________________________
Hidden Layer (Dense)                    (None, 50)                2550          Input Layer                           
________________________________________________________________________________________________________________________
Hidden Layer 2 (Dense)                  (None, 50)                2550          Hidden Layer                          
________________________________________________________________________________________________________________________
Output Layer (Dense)                    (None, 3)                 153           Hidden Layer 2                        
________________________________________________________________________________________________________________________
Total params: 5,503
Trainable params: 5,503
Non-trainable params: 0
------------------------------------------------------------------------------------------------------------------------

I will try giving it more like 8G for the Executor. In your opinion what for such model what is the minimum, the dataset is also a small it is IRIS less than 500 rows and 4 feature columns (double) and 1 string column for the labels

Yes. I think your resource is enough for that model and dataset. Will try to reproduce your example locally.

Can you tell me batchSize and core used in training? Large batch size may also leads to OOM.

dzlab commented 2 years ago

@qiyuangong I'm use just 64 batch size and 10 epochs for training more details on my setup (not sure if it could help), i'm using k8s to run the spark cluster which is simply 1 driver and 1 executor. my dependencies

lazy val bigdlLibs = Seq(
  "com.intel.analytics.zoo" % "analytics-zoo-bigdl_0.12.1-spark_3.0.0" % "0.9.0",
  "ml.dmlc" %% "xgboost4j-spark" % "1.2.0",
  "ml.dmlc" %% "xgboost4j" % "1.2.0"
)

from the logs this is most spark configuration we set

LAUNCH_CLASSPATH='/tmp/dzlab/lib/:/opt/spark/conf/:/opt/spark/jars/*:/opt/hadoop3.3.1/etc/hadoop/:/opt/hadoop3.3.1/share/hadoop/common/lib/*:/opt/hadoop3.3.1/share/hadoop/common/*:/opt/hadoop3.3.1/share/hadoop/hdfs/lib/*:/opt/hadoop3.3.1/share/hadoop/hdfs/*:/opt/hadoop3.3.1/share/hadoop/mapreduce/*:/opt/hadoop3.3.1/share/hadoop/yarn/lib/*:/opt/hadoop3.3.1/share/hadoop/yarn/*'
+ exec /usr/java/default/bin/java -cp '/tmp/dzlab/lib/:/opt/spark/conf/:/opt/spark/jars/*:/opt/hadoop3.3.1/etc/hadoop/:/opt/hadoop3.3.1/share/hadoop/common/lib/*:/opt/hadoop3.3.1/share/hadoop/common/*:/opt/hadoop3.3.1/share/hadoop/hdfs/lib/*:/opt/hadoop3.3.1/share/hadoop/hdfs/*:/opt/hadoop3.3.1/share/hadoop/mapreduce/*:/opt/hadoop3.3.1/share/hadoop/yarn/lib/*:/opt/hadoop3.3.1/share/hadoop/yarn/*' -Xmx1G -XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+UseContainerSupport org.apache.spark.deploy.SparkSubmit --conf spark.driver.bindAddress=10.0.132.15 --conf spark.kubernetes.driver.pod.name=dzlab-558454f8cf-r4zh8 --deploy-mode client --master k8s://kubernetes.default.svc:443 --conf spark.kubernetes.namespace=dev-dzlab --conf spark.kubernetes.driver.container.image=artifacts.dzlab/dzlab-kube:7.30.0-alpha.10280.g4ef1c3d-SNAPSHOT --conf spark.kubernetes.executor.container.image=artifacts.dzlab/dzlab-kube:7.30.0-alpha.10280.g4ef1c3d-SNAPSHOT --conf spark.kubernetes.executor.podNamePrefix=dzlab --conf spark.kubernetes.authenticate.oauthTokenFile=/opt/dzlab/files/token --conf spark.driver.memory=4G --conf spark.driver.memoryOverhead=385M --conf spark.driver.cores=1 --conf spark.executor.memory=4G --conf spark.executor.memoryOverhead=385M --conf spark.executor.cores=1 --conf spark.kubernetes.executor.limit.cores=1 --conf spark.executor.instances=1 --conf spark.driver.host=dzlab.dev-dzlab.svc.cluster.local --conf spark.driver.port=7077 --conf spark.driver.blockManager.port=10000 --conf spark.sql.shuffle.partitions=200 --conf spark.sql.planner.externalSort=true --conf spark.sql.hive.thriftServer.singleSession=true --conf spark.vad.startHiveServer=false --conf spark.dzlab.defaultLimit= --conf spark.dzlab.maxLimit= --conf spark.dzlab.maxOutputPartitions=1000 --conf spark.vad.fileStore=local --conf spark.vad.port=10011 --conf spark.vad.protocol=HTTPS --conf spark.vad.fixed.auth=false --conf spark.kubernetes.driver.label.parentName=dzlab --conf spark.kubernetes.executor.label.parentName=dzlab--conf spark.kubernetes.driver.label.role=ex_machina --conf spark.kubernetes.executor.label.role=ex_machina --conf spark.kubernetes.executor.secrets.dzlab=/opt/dzlab/files --conf spark.kubernetes.executor.secrets.dzlab=/opt/dzlab/files --conf spark.kubernetes.executor.podTemplateFile=/opt/dzlab/files/pod-template.yaml --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent --conf spark.kubernetes.container.image.pullSecrets= --conf spark.kubernetes.executor.volumes.emptyDir.empty-tmp.mount.path=/tmp --conf spark.kubernetes.executor.volumes.emptyDir.empty-var-tmp.mount.path=/var/tmp --conf spark.kubernetes.executor.volumes.emptyDir.unused.options.claimName=OnDemand --conf spark.kubernetes.executor.volumes.emptyDir.unused.options.storageClass=default --conf spark.kubernetes.executor.volumes.emptyDir.unused.options.sizeLimit=4G --conf spark.kubernetes.executor.volumes.emptyDir.unused.mount.path=/spilltodisk --conf spark.kubernetes.executor.volumes.emptyDir.unused.mount.readOnly=false --conf spark.kubernetes.local.dirs.tmpfs=false --conf spark.files.useFetchCache=true --conf 'spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+UseContainerSupport  ' --conf 'spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+UseContainerSupport  ' --conf spark.executor.heartbeatInterval=30s --conf spark.driver.extraClassPath=/tmp/dzlab/lib --conf spark.authenticate=true --conf spark.network.crypto.enabled=true --class dzlab.APIServer /tmp/dzlab/lib/dzlab.jar

this my spark config and how i initialize BigDL

  def setBigDLConf(conf: SparkConf): SparkConf = {
    conf.set("spark.shuffle.reduceLocality.enabled", "false")
      .set("spark.shuffle.blockTransferService", "nio")
      .set("spark.scheduler.minRegisteredResourcesRatio", "1.0")
      .set("spark.scheduler.maxRegisteredResourcesWaitingTime", "3600s")
      .set("spark.speculation", "false")
    val driverCores = conf.get("spark.driver.cores", "0").toInt                        // in my test this is 1
    val executorCores = conf.get("spark.executor.cores", "0").toInt              // in my test this is 1
    val executorInstances = conf.get("spark.executor.instances", "0").toInt // in my test this is 1
    val maxCores = String.valueOf(driverCores + executorCores * executorInstances) // in my test this becomes 2
    log.info("Updating Spark configuration spark.cores.max={}", maxCores)
    conf.set("spark.cores.max", maxCores)
  }
val conf = setBigDLConf(new SparkConf())
// Disable redirecting logs of Spark and BigDL
System.setProperty("bigdl.utils.LoggerFilter.disable", "true")
NNContext.initNNContext(conf)

In the logs i see

2022-02-24T17:37:29.324+0000 level=INFO thread="main" logger=dzlab.commons.Config req_id="" user_id=""
Updating Spark configuration spark.cores.max=2

Before training I transform the IRIS dataframe like this

  def prepareDatasetForFitting(df: DataFrame, featureColumns: Array[String], labelColumn: String, labels: Array[String]): RDD[Sample[Float]] = {
    val labelIndex = df.columns.indexOf(labelColumn)
    val featureIndices = featureColumns.map(fc => df.columns.indexOf(fc))
    val dimInput = featureColumns.length
    df.rdd.map{row =>
      val features = featureIndices.map(row.getDouble(_).toFloat)
      val featureTensor = Tensor[Float](features, Array(dimInput))
      val labelTensor = Tensor[Float](1)
      labelTensor(Array(1)) = labels.indexOf(String.valueOf(row.get(labelIndex))) + 1
      Sample[Float](featureTensor, labelTensor)
    }
  }
val trainRDD = prepareDatasetForFitting(trainDF, featureColumns, labelColumn, labels)
val validRDD = prepareDatasetForFitting(validDF, featureColumns, labelColumn, labels)

I create the model and train like this (more or less)

val model = Sequential[Float]()
val layers: Seq[Dense[Float]] = // create dense layers
layers.foreach(layer => model.add(layer))
val optimizer = new SGD[Float](0.001)
val loss = BCECriterion[Float]()
val metrics = List[ValidationMethod[Float]](new Top1Accuracy[Float]())
model.compile(optimizer, loss, metrics)
model.fit(trainRDD, trainBatchSize, numEpochs, validRDD)

The crash happens when it tried to read the model in the executor but this works like a charm if I try locally.

qiyuangong commented 2 years ago
+ exec /usr/java/default/bin/java -cp '/tmp/dzlab/lib/:/opt/spark/conf/:/opt/spark/jars/*:/opt/hadoop3.3.1/etc/hadoop/:/opt/hadoop3.3.1/share/hadoop/common/lib/*:/opt/hadoop3.3.1/share/hadoop/common/*:/opt/hadoop3.3.1/share/hadoop/hdfs/lib/*:/opt/hadoop3.3.1/share/hadoop/hdfs/*:/opt/hadoop3.3.1/share/hadoop/mapreduce/*:/opt/hadoop3.3.1/share/hadoop/yarn/lib/*:/opt/hadoop3.3.1/share/hadoop/yarn/*' -Xmx1G -XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+UseContainerSupport org.apache.spark.deploy.SparkSubmit --conf spark.driver.bindAddress=10.0.132.15 --conf spark.kubernetes.driver.pod.name=dzlab-558454f8cf-r4zh8 --deploy-mode client --master k8s://kubernetes.default.svc:443 --conf spark.kubernetes.namespace=dev-dzlab --conf spark.kubernetes.driver.container.image=artifacts.dzlab/dzlab-kube:7.30.0-alpha.10280.g4ef1c3d-SNAPSHOT --conf spark.kubernetes.executor.container.image=artifacts.dzlab/dzlab-kube:7.30.0-alpha.10280.g4ef1c3d-SNAPSHOT --conf spark.kubernetes.executor.podNamePrefix=dzlab --conf spark.kubernetes.authenticate.oauthTokenFile=/opt/dzlab/files/token --conf spark.driver.memory=4G --conf spark.driver.memoryOverhead=385M --conf spark.driver.cores=1 --conf spark.executor.memory=4G --conf spark.executor.memoryOverhead=385M --conf spark.executor.cores=1 --conf spark.kubernetes.executor.limit.cores=1 --conf spark.executor.instances=1 --conf spark.driver.host=dzlab.dev-dzlab.svc.cluster.local --conf spark.driver.port=7077 --conf spark.driver.blockManager.port=10000 --conf spark.sql.shuffle.partitions=200 --conf spark.sql.planner.externalSort=true --conf spark.sql.hive.thriftServer.singleSession=true --conf spark.vad.startHiveServer=false --conf spark.dzlab.defaultLimit= --conf spark.dzlab.maxLimit= --conf spark.dzlab.maxOutputPartitions=1000 --conf spark.vad.fileStore=local --conf spark.vad.port=10011 --conf spark.vad.protocol=HTTPS --conf spark.vad.fixed.auth=false --conf spark.kubernetes.driver.label.parentName=dzlab --conf spark.kubernetes.executor.label.parentName=dzlab--conf spark.kubernetes.driver.label.role=ex_machina --conf spark.kubernetes.executor.label.role=ex_machina --conf spark.kubernetes.executor.secrets.dzlab=/opt/dzlab/files --conf spark.kubernetes.executor.secrets.dzlab=/opt/dzlab/files --conf spark.kubernetes.executor.podTemplateFile=/opt/dzlab/files/pod-template.yaml --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent --conf spark.kubernetes.container.image.pullSecrets= --conf spark.kubernetes.executor.volumes.emptyDir.empty-tmp.mount.path=/tmp --conf spark.kubernetes.executor.volumes.emptyDir.empty-var-tmp.mount.path=/var/tmp --conf spark.kubernetes.executor.volumes.emptyDir.unused.options.claimName=OnDemand --conf spark.kubernetes.executor.volumes.emptyDir.unused.options.storageClass=default --conf spark.kubernetes.executor.volumes.emptyDir.unused.options.sizeLimit=4G --conf spark.kubernetes.executor.volumes.emptyDir.unused.mount.path=/spilltodisk --conf spark.kubernetes.executor.volumes.emptyDir.unused.mount.readOnly=false --conf spark.kubernetes.local.dirs.tmpfs=false --conf spark.files.useFetchCache=true --conf 'spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+UseContainerSupport  ' --conf 'spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+UseContainerSupport  ' --conf spark.executor.heartbeatInterval=30s --conf spark.driver.extraClassPath=/tmp/dzlab/lib --conf spark.authenticate=true --conf spark.network.crypto.enabled=true --class dzlab.APIServer /tmp/dzlab/lib/dzlab.jar

Hi @dzlab

I have tried your example in my local desktop. Code is in this place (https://github.com/qiyuangong/Zoo_Benchmark/pull/5). Please take a look.

My local test passed on both local and Kubernetes with 2G and 4GB for both driver and executor. My batch Size is 36-72. Training works fine, but I saw that job used much more cores than expected. I have added my command in README of that PR (https://github.com/qiyuangong/Zoo_Benchmark/blob/4c67b6faf63149ad2042808f6ff2faef1a985383/src/main/scala/com/intel/analytics/zoo/benchmark/training/README.md).

Can you share full logs and submit command of your job?

BTW, pls avoid using hadoop 3.X. Because most Spark applications are still built on Hadoop 2.7X.

dzlab commented 2 years ago

@qiyuangong thanks for looking into this, i probably messed up with some config then, I will try further investigate this

qiyuangong commented 2 years ago

@qiyuangong thanks for looking into this, i probably messed up with some config then, I will try further investigate this

OK. I think you can remove some config or parameters in spark-submit, then add them back one by one if necessary.

Let me know if there is any problem.

dzlab commented 2 years ago

problem fixed