cloudml / zen

Zen aims to provide the largest scale and the most efficient machine learning platform on top of Spark, including but not limited to logistic regression, latent dirichilet allocation, factorization machines and DNN.
Apache License 2.0
170 stars 75 forks source link

(FM/MVM, etc.): ArrayIndexOutOfBoundsException when scaling #62

Open benmccann opened 8 years ago

benmccann commented 8 years ago

I get the exception below whenever I run the FM code on any of my real datasets. It seems to break roughly when you have >100k training examples and >100 machines.

java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
    at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:163)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Here's the driver stacktrace:

org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
com.github.cloudml.zen.ml.partitioner.DBHPartitioner$.partitionByDBH(DBHPartitioner.scala:70)
com.github.cloudml.zen.ml.recommendation.FM$.initializeDataSet(FM.scala:498)

The odd thing is that the driver stacktrace shows the error happening in initializeDataSet, but it doesn't seem to occur until training is done. To speed reproduction of the problem I set numIterations to 1.

witgo commented 8 years ago

@benmccann Thanks, I have to keep track of this bug. Progress will be posted here.

witgo commented 8 years ago

@benmccann

63 can fix this bug?

benmccann commented 8 years ago

Nope. Still getting the error. Thank you for the suggestion

witgo commented 8 years ago

We do not have enough machines to test the bug and only have four machines (160 cores, 1T memory). In my tests I did not find the error.

The following conf/spark-defaults.conf:

spark.master = yarn-client
spark.driver.memory = 30g
spark.executor.cores = 4
spark.executor.instances = 24
spark.executor.memory = 20g

Test data: http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.bz2

Test code:

  import com.github.cloudml.zen.ml.recommendation.{FMModel, FMClassification}
  import org.apache.spark.storage.StorageLevel
  import org.apache.spark.mllib.util.MLUtils

  val storageLevel = StorageLevel.MEMORY_AND_DISK
  val data = MLUtils.loadLibSVMFile(sc, "/witgo/kddb").repartition(96)
  val Array(trainSet, testSet) = data.zipWithUniqueId().map(_.swap).randomSplit(Array(0.9, 0.1))
  trainSet.persist(storageLevel).count()
  testSet.persist(storageLevel).count()

  val numIterations = 100
  val stepSize = 0.1
  val l2 = (0.01, 0.01, 0.01)
  val rank = 32
  val useAdaGrad = true
  val lfm = new FMClassification(trainSet, stepSize, l2, rank, useAdaGrad, 1.0, storageLevel)

  var iter = 0
  var model: FMModel = null
  while (iter < numIterations) {
    val thisItr = math.min(50, numIterations - iter)
    iter += thisItr
    if (model != null) model.factors.unpersist(false)
    lfm.run(thisItr)
    model = lfm.saveModel()
    model.factors.persist(storageLevel)
    model.factors.count()
    val auc = model.loss(testSet)
    println(f"(Iteration $iter/$numIterations) Test AUC:                     $auc%1.6f")
  }
benmccann commented 8 years ago

I'm not sure if it's 100 machines or just 100 executors that are required to be able to reproduce the bug. You may have more luck reproducing with:

spark.executor.cores = 1
spark.executor.instances = 120

You could probably run on AWS EMR to reproduce as well.

witgo commented 8 years ago

I used the 120 executors still can not reproduce the bug. It seems to be caused by other reasons.