amplab / SparkNet

Distributed Neural Networks for Spark
MIT License
603 stars 172 forks source link

Performance issue due to spark #118

Closed nhe150 closed 8 years ago

nhe150 commented 8 years ago

1) the code in training trainDF.foreachPartition trainIt => val it = trainIt.drop(startIdx) othercomputing is the most time consuming one. (8:1 ratio == drop time vs other computing time ). This is due to inherit issue of spark framework. Should someone in Spark to take a look this issue?

robertnishihara commented 8 years ago

Thanks Norman! Are you running ImageNetApp? I think in this case the problem is that we forgot to cache the DataFrame, so sometimes it ends up getting recomputed which is expensive. This can be fixed by caching the relevant DataFrames. We fixed this for ImageNetApp in #117. Are you using the current master?

nhe150 commented 8 years ago

Hi Robert, I am running latest master branch CifarApp. Seems to me the cache is already in place. trainDF = trainDF.repartition(numWorkers).cache()

But do verify. Let us make sure the stuff took vs iters took ratio is around 8:1 when you run your test on cifarApp with all cache in places.

robertnishihara commented 8 years ago

Yes, you're right, in CifarApp.scala, it looks like the line val it = trainIt.drop(startIdx) takes around 1 second. Looks like there is some overhead from Spark here This isn't a huge problem for us because even if that call were instantaneous, the line

netWeights = workers.map(_ => { workerStore.get[CaffeSolver]("solver").trainNet.getWeights() }).reduce((a, b) => CaffeWeightCollection.add(a, b))

which collects the weights from the workers, and the line

val broadcastWeights = sc.broadcast(netWeights)

which broadcasts the weights could still be slow. The Cifar model is small enough that those lines are actually very quick, but for larger models they will take multiple seconds.

We can amortize the overhead from all of these calls by increasing syncInterval. In particular, for ImageNetApp.scala, where the overhead is more of a problem, we set syncInterval = 50.

nhe150 commented 8 years ago

Thank you for pointing out problem of large model communication overhead. 1) amortization is very good based on the PSGD published in the paper. 2) There is RDMA based communication for spark http://hibd.cse.ohio-state.edu/#spark However, if the above 2 do cut down the overhead of model communication. Spark overhead will be a huge issue.

I will try imagenet to figure out large model communication overhead.

nhe150 commented 8 years ago

HI Robert, I have tried imagenet. The result is very promising, I would like to investigate more and will share the results. If everything turns out correct, I do believe the 1 seconds drop time is huge problem(cause it can not be amortized). The best approach to solve the drop delay is to engage spark (or databrick people) to figure out how to speed up the drop() from 1 second to milliseconds.

robertnishihara commented 8 years ago

Awesome! Please do share the results.

We should do more in depth profiling. For the overhead from the drop call, you're right and we'd want to talk to Spark people. There's probably also overhead from the calls to transformInto, which could probably be optimized a bit.

nhe150 commented 8 years ago

Right on! transformInto is key for dataframe integration and data distribution. Even though transformInto is slow now, it will be optimized for the reason of ease to use and adoption.

robertnishihara commented 8 years ago

Agreed, it is very important for generality! (any time you want to transform your data after reading it from a dataframe and before feeding it into a network (e.g., subtracting the mean, converting Array[Byte] -> Array[Float], or decompressing JPEGs).