rjagerman / glint

Glint: High performance scala parameter server
MIT License
168 stars 67 forks source link

Rework of Glint internals #55

Open rjagerman opened 7 years ago

rjagerman commented 7 years ago

I've been unexpectedly busy these last summer months, so I unfortunately did not find much time to work on Glint. There has been some interesting discussion from users in issues #54, #49 and #28, thank you for your feedback! Starting from November I can free up time in my regular schedule to dedicate to the project.

To address some of the issues that other contributors and users have made, I plan to make some pretty big changes to Glint's internal workings. I'm opening it up for discussion here, so feel free to speak your mind. These are just some of the ideas I have and I love to hear your feedback, concerns and comments on these.

1) Integration with Spark I have been working on a very simple proof of concept to simplify integration with Spark. In this concept you can include Glint as a library in your application by adding a line to your build.sbt file and then call glint.run(sparkcontext) somewhere in your code to start the entire Glint system (master and parameter servers) on your existing Spark cluster. This has several big advantages:

  1. It makes Glint much easier to use. You don't have to run it as separate java processes.
  2. It enables us to very easily ship user-defined functions to the parameter server because the compiled jar gets distributed by Spark for us. This enables custom processing at the parameter servers to simplify things like L1 regularization.

The major drawback is that we would no longer be able to run Glint as a stand-alone service. This change will make Glint dependent on Spark to ship functions across machines. As a result we can not run it outside of Spark. Nevertheless, I feel the benefits definitely outweigh the disadvantages.

2) Setting up benchmarks on simple machine-learning tasks To measure the impact of future changes, we should have several standard machine-learning tasks on various data sizes (Linear SVM, Logistic Regression, etc.). Perhaps even very simple tasks such as an all-reduce. It is important to establish a baseline here, so that progress can be accurately measured.

3) Breeze dense & sparse vectors/matrices instead of Arrays Currently all model data is stored as dense Arrays to prevent garbage collection overhead from being extreme. However, sparse models have significant advantages in terms of scalability. I propose a re-work of the internals of the PartialMatrix and PartialVector actors to utilize breeze's dense and sparse classes under the hood. We can return breeze structures as the result of pull requests, they should be easier to use than Arrays for machine learning applications. To manage possible garbage collection problems, we could eventually look into things like scala-offheap.

4) Fault tolerance If a parameter server goes down, you lose that part of the model. Depending on your algorithm this may range from "we can recover" to "very very bad news". Other implementations of parameter servers boast a chord-like DHT structure that replicates the model across multiple machines to mitigate system failures (see figure 8 of Li et al (2014)). However, this is likely to give us significant performance degradation, because push requests need to go through all replicated models on other machines. We can of course vary the replication rate to balance fault-tolerance against performance. This will probably take the longest as it is quite complicated to implement. Nevertheless, it seems like an important thing to do to bring Glint to a larger audience.

5) Documentation The documentation is in a bit of an awkward place right now. Several parts are missing, and other parts need to be clarified or rewritten. With the earlier proposed changes, the documentation can change quite significantly, so I don't want to waste a lot of time writing documentation that will be obsolete soon. If anyone has a good suggestion for a documentation framework in which we can easily publish to gh-pages, I'd love to hear it. I'm currently toying around in Sphinx and it works decently well, however, suggestions are welcome.

codlife commented 7 years ago

very cool

MLnick commented 7 years ago

For (1), do you mean effectively using Spark's functionality for shipping closures across the network? I agree this is helpful as Spark handles the cleaning of closures etc (and has gone through a lot of effort to make it work). It may be worth looking at the spores project too (https://github.com/heathermiller/spores), though I think it is still a bit rough perhaps?

How do you propose this works exactly? So let's say I do async SGD, which could look something like this:

val vector = client.vector[Double](dim)
data.foreachPartition { iter =>
    val localData = iter.toIterable
    for (i <- 1 to iterations) {
        val w = vector.pull(keys)                // get latest weights
        val grad = localData.foldLeft(...)  // compute gradient
        val update = -localStepSize * grad
        vector.push(keys, update)
    }
}

Now on the param servers I need to update w by w = w - step * w * regParam, where w has already been updated with update, the (scaled) gradient from the worker. Do you propose using Spark something like:

paramServers.foreach { ps =>
    val w = ps.getVector(...)
    val newW = w - step * w * regParam
    ps.setVector(newW)
}
MLnick commented 7 years ago

Could another option for (1) be an interface for adding Actors in a chain? Most PS implementations have some concept of filters as well as UDFs. Thinking about it, both could potentially be thought of as Actors within a chain of transformations (the Akka Streams type of model).

Then a UDF could be implemented by providing an Actor that knows how to e.g. apply the L1 proximal operator to a vector.

MLnick commented 7 years ago

In terms of (4), I think it may be most efficient to see if the built-in Akka fault tolerance, persistence and replication tools can be used.

codlife commented 7 years ago

hello @MLnick I'm doing asynchronous sgd, but i'm just start up. there are much more work to do. https://github.com/CASISCAS/asyspark

rjagerman commented 7 years ago

About (1), good point about chained actors, this could be a very elegant solution. One of the reasons I want to integrate with Spark is because user-provided bytecode (whether this is in the form of UDFs or custom actors) need to be distributed to the parameter servers and loaded at runtime. Spark handles this quite nicely by distributing the jar and dynamically loading it into the classpath at runtime. This prevents "class unknown" errors that you'd normally get when trying to serialize functions/classes across the network to different JVMs. Serializing user defined functions/classes in a distributed setting is quite difficult as illustrated (here the extremely simple anonymous function (x,y) => x+y is attempted to be serialized, but still causes an error):

The akka streams model seems very appropriate, but unfortunately doesn't work with remote/network-based/tcp akka actors (only those within the same JVM). Probably for the same reason outlined above.

For (4), we should maybe look at different consistency models for different use-cases. I can identify two here that need completely different replication behavior:

  1. In the case of collapsed gibbs sampling (LDA), we'd need all replicas to be exactly up-to-date at all points in time. This is because a difference in count-table and local topic assignments could mess up the algorithm (you can theoretically get negative counts if you are unlucky enough).
  2. For stochastic algorithms, if during failure the model gets set back to a previous point of time, it is not a very big deal since you just lose a few stochastic updates. We can do periodic model replications which don't have to take up much network usage. This is a much easier scenario to implement in my opinion, and the way to go for now.
rjagerman commented 7 years ago

Actually, now that I look at the most recent documentation, there seems to have been some progress made on akka streams over network (e.g. TCP-based streaming): http://doc.akka.io/docs/akka/2.4.10/scala/stream/stream-io.html