rjagerman / glint

Glint: High performance scala parameter server
MIT License
168 stars 67 forks source link

PullFailedException in large dataset #48

Open cstur4 opened 8 years ago

cstur4 commented 8 years ago

glint.exceptions.PullFailedException: Failed 10 out of 10 attempts to push data at glint.models.client.async.PullFSM.glint$models$client$async$PullFSM$$execute(PullFSM.scala:67) at glint.models.client.async.PullFSM$$anonfun$request$1.applyOrElse(PullFSM.scala:79) at glint.models.client.async.PullFSM$$anonfun$request$1.applyOrElse(PullFSM.scala:76) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Glint works well in my toy dataset, but fails with exception in big dataset.

rjagerman commented 8 years ago

There's multiple reasons a pull or push can fail. I think it's a good idea for me to add documentation on this, as this behavior is not intuitive. The two most common reasons are:

  1. The pull request may be too large. The underlying networking library Akka has a configurable limit on message sizes to retain responsiveness. Messages that exceed this limit will be dropped. You could try increasing the size using the configuration, but this is not recommended. It is better to split your pull/push requests into smaller chunks. If you are using a matrix, you can try using a GranularBigMatrix wrapper, which puts an upper limit on message size and splits big pulls/pushes into smaller pulls/pushes:

    val matrix = client.matrix[Double](10000000, 10000) // large matrix
    val granularMatrix = new GranularBigMatrix[Double](matrix, 10000) // maximum message size = 10000 keys
    granularMatrix.pull( someLargeArrayOfIndices )
  2. There are too many simultaneous pull requests. In this case the parameter servers are getting flooded by requests and cannot respond to all of them. Akka by default queues up requests and if the queue becomes too large, requests will be dropped. Due to the asynchronous nature of Glint, it is very easy to send a lot of requests in parallel. The drawback is that there is no inherit blocking or back-pressure. You can try to simulate back-pressure by waiting for some requests to finish. For example, a Semaphore can be used to upper bound the number of open requests:

    val lock = new java.util.concurrent.Semaphore(16) // maximum of 16 open requests
    dataset.foreach {
       lock.acquire() // Block execution here (wait for 1 of the 16 slots to open and reserve it)
       val request = vector.pull(...) // Perform pull request
       request.onComplete { case _ => lock.release() } // Release 1 of the 16 slots when the request has finished
    }

    When doing something like this, it is very important to not put a blocking operation such as lock.acquire() inside anything that may be handled by the execution context (i.e., don't put it inside any future or callback). It is possible to deadlock your system that way!

Let me know if this helps your problem. If not we'll try to resolve it some other way. I'd be very interested in your code and seeing if I can help out there.

cstur4 commented 8 years ago

I have increased the limit size of msg in akka. If it is due to too many simultaneous pull requests, I am a bit confused. I use the same number of partitions, and the pull requests per second seems not to change because every pull request in a partition is synchronous.

cstur4 commented 8 years ago

I try to add more servers, but it does not work.

cstur4 commented 8 years ago

After I go through the source code, I found reason is the second one. In big dataset, the number of features is much more than toy dataset. After I decreased the number of features, it works.

I think the hot features are the bottleneck. For example, the bias term. But adding more servers or decreasing features cannot share the pressure. I cannot understand why the number of features affect.

cstur4 commented 8 years ago

I finally reduce the number of features, and it works as expected.

MLnick commented 7 years ago

Just a note on https://github.com/rjagerman/glint/issues/48#issuecomment-237158188 (2) - the lock (semaphore) will be shipped to each partition and deserialized. So it seems to me that solution won't limit the overall number of concurrent requests - only per partition. Is that the intention?

Otherwise, the only way to limit things is via thread pool size?

rjagerman commented 7 years ago

So, the code example is pretty unclear, my apologies: The dataset is not an RDD in Spark, but a single partition of the data. Better example would probably be:

rdd.foreachPartition { case partition => 
    val lock = new java.util.concurrent.Semaphore(16) // maximum of 16 open requests
    partition.foreach { case sample => 
        lock.acquire()
        val request = vector.push(...)
        request.onComplete { case _ => lock.release() }
    }
    lock.acquire(16) // wait for all of them to finish
    lock.release(16)
}

A major challenge in asynchronous computing is the lack of backpressure. Unless you implement some sort of signal it is difficult to know when you are sending too many requests. It's a careful balancing act between optimizing CPU utilization and network communication. I haven't found a great solution for this and consider it an open research question. Existing parameter server literature does not really address this.

Most solutions in Akka that deal with backpressure are complicated and have the consumer signaling the producer to slow down. This is difficult to do in Glint, because the library can't necessarily stop someone from queueing up more pull or push requests without making these functions blocking and thus synchronous.

The Semaphore solution works because it limits the number of open requests, forcing the parameter server to finish processing some of them before we queue up more of them. However, it is very difficult to guess the optimal size of the semaphore as this heavily depends on your specific setup (I use 16 here because it works for me)... I hope this helps somewhat.

Note that even if you limit the size of a fixed thread pool, your code can still queue up more requests and they will be placed in an unbounded queue (depending on your specific ExecutionContext). This will cause eventual out-of-memory problems. More info about this in the Java 8 docs. You could force your ExecutionContext to use a blocking bounded queue (such as done here). I would however highly recommend against this, because you tread into the realm of deadlocks which are very difficult to find and debug...

MLnick commented 7 years ago

Thanks, that is helpful. Yes, good point on the thread pools. I also saw that playing with thread pool size didn't help and in fact hurt performance.

What I've found most important is trying to achieve a good spread of features (really the "hot features") across model partitions.

The above approach will work well with "mini-batch" training within each partition. I won't have time to implement it but I think it could work well. It could be completely aysnc per mini-batch with a limit on overall open requests. Or perhaps a "sliding window" over pull futures, such that while the current batch is being computed, the next pull request is being completed in the background.