dask / dask-glm

BSD 3-Clause "New" or "Revised" License
76 stars 46 forks source link

WIP: ENH: Basic implementation of SGD #69

Open stsievert opened 6 years ago

stsievert commented 6 years ago

This implements the popular mini-batch stochastic gradient descent in dask-glm. At each iteration, it grabs batch_size examples, computes the gradients for these examples, and then updates the parameter accordingly.

The main benefit of SGD is that the convergence time does not depend on the number of examples. Here, "convergence time" means "number of arithmetic operations", not "wall-clock time until completion".

TODOs:

mrocklin commented 6 years ago

This is cool to see. Have you tried this on a dataset, artificial or otherwise? Do you have a sense for the performance bottlenecks here? The first thing I would do here would be to run this with the distributed scheduler on a single machine and watch the dashboard during execution. I suspect that it would be illiuminating

stsievert commented 6 years ago

@mrocklin and I have had some discussion over at https://github.com/dask/dask/issues/3251 about this on one of the bugs that I encountered while implementing this. The takeaway from this discussion was that I should randomly shuffle the dataset every epoch and walk sequentially through it, not sample the dataset at random.

I'm now curious what kinds of parallel SGD algorithms exist. https://github.com/dask/dask/issues/3251#issuecomment-372148839

Most parallel SGD algorithms I've seen rely on some sparsity constraints (e.g., Hogwild!, Hogwild++, Cyclades). Practically, there's a much easier method to parallelize SGD: parallelize the gradient computation for a mini-batch. In practice for deep learning at least, that's the main bottleneck.

futures samples asynchronously to overlap communication and computation

Got it. I see what you're saying. The futures API should make that pretty convenient.

We are computing the gradient for different examples, then summing them. Could we use map_blocks to compute the gradient locally for each chunk, then send the gradients computed from each chunk back to the master? Or maybe a better question: would this change anything?

(I meant to send this in as soon as PR filed; sorry the delay)

stsievert commented 6 years ago

Have you tried this on a dataset, artificial or otherwise? Do you have a sense for the performance bottlenecks here?

This is a very early work (the code isn't even error-free), and I only filed this PR to move the discussion from the other unrelated issue. I'll look more into performance bottlenecks after I finish the implementation.

mrocklin commented 6 years ago

My understanding of Hogwild! and similar algorithms is that they expect roundtrip latencies in the microseconds, which is, for them, a critical number for performance. Is this understanding correct? If so then how does a dynamic distributed system like Dask (which has latencies in the several milliseconds) manage?

The takeaway from this discussion was that I should randomly shuffle the dataset every epoch and walk sequentially through it, not sample the dataset at random

How many epochs are there? Are you expected to use all of the data every epoch? You should be aware that while shuffling is much faster than n random accesses, it's still quite slow. There might be other approaches, like gathering a sizable random sample (like 10% of the data) 10x more frequently.

This is a very early work (the code isn't even error-free), and I only filed this PR to move the discussion from the other unrelated issue. I'll look more into performance bottlenecks after I finish the implementation.

Understood, and I really appreciate the early submission to stimulate discussion. I think that most of my questions so far have been aiming to to the following point: Our cost model has changed dramatically from what it was on a multi-core machine. We should build intuition sooner rather than later to help inform algorithm decisions.

We are computing the gradient for different examples, then summing them. Could we use map_blocks to compute the gradient locally for each chunk, then send the gradients computed from each chunk back to the master? Or maybe a better question: would this change anything?

Sure, that's easy to do, that seems quite different than the SGD approach you were mentioning earlier though that avoids looking at all of the data on each iteration.

mrocklin commented 6 years ago

In case you haven't seen it already you might want to look at https://github.com/dask/dask-glm/issues/57

mrocklin commented 6 years ago

Ah, as before I see that you've already covered this ground: https://github.com/dask/dask-glm/issues/57#issuecomment-317128537

stsievert commented 6 years ago

expect roundtrip latencies in the microseconds, which is, for them, a critical number for performance. Is this understanding correct?

In the literature the latency unit is number of writes to the parameter vector (e.g., section 2.1 of taming the wild). In the limiting case where bandwidth is an issue, I don't see how halving the communication bandwidth could be an issue (but it could be if stragglers are considered).

How many epochs are there? Are you expected to use all of the data every epoch? You should be aware that while shuffling is much faster than n random accesses, it's still quite slow

Normally less than 100 (~100 for CIFAR-10 or ImageNet, ~10–20 for MNIST). In the case of the taxicab dataset, I'd imagine less: it's a simple model with many data points.

Is that acceptable? I'm okay shuffling the data completely infrequently and shuffling on each machine infrequently. Either way, I think this will provide gains over computing the gradient for all examples.

Our cost model has changed dramatically from what it was on a multi-core machine. We should build intuition sooner rather than later to help inform algorithm decisions.

Agreed. We should avoid premature optimization and fix the problems we see.

mrocklin commented 6 years ago

Full shuffles on the nyc taxi cab dataset on a modest sized cluster take a minute or so. Pulling a random GB of data or so from the cluster and then shuffling it locally would likely take a few seconds. I could also imagine doing shuffling asynchronously while also pulling data locally.

mrocklin commented 6 years ago

Have you had a chance to run this while looking at the dashboard? If not let me know and we can walk through this together. I think that it will help.

stsievert commented 6 years ago

Yeah, I've had a chance to look at the dashboard (after making sure the code optimizes successfully). It looks like indexing is the bottleneck, I believe it spends about 67% of the time there. At least, that portion of the dashboard remains constant when "getitem" is selected instead of "All" from the dropdown menu.

I think something is high bandwidth too, some task is using up to 20Mb/s. That seems high, especially since the iterations are not fast, my beta is only 20 elements and 32 examples are used to approximate the gradient. Could indexing be responsible for this? I don't think dot products or scalar functions are responsible.

I think we can use map_blocks to get around this issue (well, if this is an issue), which would rely on the fact that gradient(X, beta) = sum(gradient(x, beta) for x in X) for most losses (including GLMs). This would only require the communication of beta, which has the same communication cost as 1 example (at least for a linear model).

stsievert commented 6 years ago

I've played with this a little more, encountered some performance issues and thought of some (possible) solutions. Shuffling the arrays (algorithms.py#L189) takes a long time on my local machine, even if it's once every epoch. This issue can be resolved by moving the indexing to each array chunk, which means that we wouldn't ever have to move the entire dataset.

This would need to make one assumption: that each row in X is not split into different blocks, or the chunks are only along the first dimension. SGD is designed for when the number of examples is very large, and it's typically much larger than the dimension of each example (e.g., the NYC taxicab dataset: 19 features, millions of examples).

The implementation would look something like

def _sgd_grad(family, Xbeta, X, y, idx, chunks, block_id=0):
    i = _proper_indices(idx, block_id, chunks)
    return family.grad(Xbeta[i], X[i], y[i])

for k in range(100):
    i = np.random.choice(n, size=(batch_size,))
    grad = da.map_blocks(_sgd_grad, famliy, Xbeta, X, y, i, X.chunks)
    grad = grad.reshape(...).sum(axis=...)
    ...

I think I'll point to this idea tomorrow @mrocklin.

stsievert commented 6 years ago

There is a dask/sklearn/skimage sprint soon, and will help achieve some of the ML goals: https://github.com/scisprints/2018_05_sklearn_skimage_dask/issues/1#issue-318675878, specifically on scaling to larger datasets. I'll try to put in time during the sprint to implement this, regardless if I'm unable to attend (which may happen for a variety of concerns).

cc @mrocklin

mrocklin commented 6 years ago

I agree that this would be a useful topic. I suspect that it is also something that people at the sprint might find interesting to work on or use. I may find some time before the sprint to put together some infrastructure to make experimentation easier here. I may raise this as a broader topic of discussion at dask-ml.

On Sat, May 5, 2018 at 2:21 PM, Scott Sievert notifications@github.com wrote:

There is a dask/sklearn/skimage sprint soon, and will help achieve some of the ML goals: scisprints/2018_05_sklearn_skimage_dask#1 (comment) https://github.com/scisprints/2018_05_sklearn_skimage_dask/issues/1#issue-318675878, specifically on scaling to larger datasets. I'll try to put in time during the sprint to implement this, regardless if I'm unable to attend (which may happen for a variety of concerns).

cc @mrocklin https://github.com/mrocklin

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-glm/pull/69#issuecomment-386825217, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszGdHsmaGWeE5IagS85rF7g8zcTqKks5tve2kgaJpZM4Sl3kx .

stsievert commented 6 years ago

I've updated this, with help from https://github.com/dask/dask/pull/3407. This implementation relies on slicing a Dask array with a NumPy array. I'm not convinced this is the best approach. I generate something like 200k jobs when I index with Dask slices for a feature matrix with size (1015701, 20) and a batch size of 1000.

I'll try to rework for the re-indexing approach mentioned in revisiting https://github.com/dask/dask-glm/pull/69#discussion_r177179296.