dask / dask-ml

Scalable Machine Learning with Dask
http://ml.dask.org
BSD 3-Clause "New" or "Revised" License
894 stars 255 forks source link

Integration with deep learning frameworks #268

Open lesteve opened 6 years ago

lesteve commented 6 years ago

Collecting related thoughts from https://github.com/dask/dask-ml/issues/210.

@stsievert in https://github.com/dask/dask-ml/issues/210#issuecomment-401836989

Especially with PyTorch. It certainly feels like there should be an integration with PyTorch because it has torch.distributed and torch.multiprocessing

@stsievert in https://github.com/dask/dask-ml/issues/210#issuecomment-397023505

Optimization:

  • I see these algs (e.g., Hogwild) as exploiting dask's distributed architecture
  • These will require a parameter server. Can we make this general and integrate with (for example) CuPy/Chainer and PyTorch?

@TomAugspurger in https://github.com/dask/dask-ml/issues/210#issuecomment-397005038

Dask-Tensorflow

  • Review new datasets API, anything we should do there?
TomAugspurger commented 6 years ago

Thanks @lesteve!

I'm relatively unfamiliar with these libraries, especially with their distributed runtimes, where Dask may be most useful.

I'd be curious to hear from people who have experience here.

mrocklin commented 6 years ago

I'm also unfamiliar with using any modern deep learning library. I'd love to hear from people who do use them, want to use them in a distributed way, and have experienced some pain while trying to do so. (also cc'ing @bnaul @seibert)

My current understanding is that people do the following:

  1. Build up a model. For this they use Tensorflow, Keras, PyTorch, etc.. I don't think Dask has any role to play here
  2. Choose an optimizer, like AdaGrad. Dask might play a role here, or it might modify this optimizer, similar to the approach taken in Hovorod

    # Build model...
    loss = ...
    opt = tf.train.AdagradOptimizer(0.01 * hvd.size())
    
    # Add Horovod Distributed Optimizer
    opt = hvd.DistributedOptimizer(opt)
  3. Load and preprocess data. Numpy/Pandas is probably active here, maybe Dask as well
  4. Hand that data to the model/optimizer for training
mrocklin commented 6 years ago

Personally, I'm curious to see what workflows that might potentially use Dask and a deep learning framework would look like. That is something productive that people can do now that might help to focus the discussion.

lesteve commented 6 years ago

I guess this is not really integration per se so I opened a different issue #281 about the use case that some people around me are trying to tackle by combining dask and deep learning frameworks.

mrocklin commented 6 years ago

Questions like that are welcome. It's nice to identify issues that come up in practice, even if they are less research-y.

On Wed, Jul 4, 2018 at 4:53 AM, Loïc Estève notifications@github.com wrote:

I guess this is not really integration per se so I opened a different issue #281 https://github.com/dask/dask-ml/issues/281 about the use case that some people are trying to tackle by combining dask and deep learning frameworks.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ml/issues/268#issuecomment-402411728, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszFNAxWdWyykdSZ8n1IKnFUuKBPgvks5uDIKMgaJpZM4VAn0f .

arthurmensch commented 6 years ago

Currently pytorch only documented way of doing Hogwild training is through using Process() commands and inheritance of models. See for instance https://github.com/pytorch/examples/blob/master/mnist_hogwild/main.py.

This is not compatible with the modern approach of using dask pools, or even multiprocess pools. I don't really see a simple way around that.

corrado9999 commented 6 years ago

My professional experience in which I believe daks-ml would be great.

I work with remote sensing data. If you are thinking of Google Maps that is only part of the story. Going to the point, we have images of several gigabytes (tens of thousands of pixel, tens of channels) acquired tens times each year. Thanks to NASA and ESA (European Space Agency), we have 4 great pools of images available for free: MODIS, LANDSAT-8, Sentinel-1 and Sentinel-2.

Now, I am facing the issue to train some deep network on these data. They are all available on AWS (https://registry.opendata.aws/?search=satellite%20imagery), so it makes perfect sense to avoid the downloading and train the model directly on the cloud. As features we use bands from one (or more!) datasource, but also time must be taken into account in some way (this not something dask can help with, but it could somehow affect the design).

The training is usually supervised, the target being classification (water, urban, grassland, cropland, ...). We are also interested in 3 main kind of classification:

  1. Pure pixel based: each pixel is assigned to one class, without knowing where it is located
  2. Pixel based with neighborhood: this is where CNN come to play, because we want to exploit information coming from near pixel in order to estimate some spatial features
  3. Object based: we aggregate pixels based on some vector data (known a priori)

Please, take into account that we have a lot of data, and that data storage usually costs more than processing. So, in some cases it may be preferable to perform data preprocessing/normalization at each batch, in some other cases we may prefer to cache them (instance based). For the same reason, along and across epochs we would like to minimize data movement, but yet mix batches coming from different images in most possible ways.

Sorry for the long post, I hope to have been clear enough about my use case.

joemcglinchy commented 5 years ago

I also use primarily remote sensing data, but my use case is more on the model inference stage. Suppose you have already trained a model to your satisfaction. This is typically done with smaller samples, which absolutely could be supported by dask using a windowed approach. Once the model is trained I would like to apply it to a remote sensing dataset which is typically quite large, think 10k's of rows and columns. Loading that full dataset into memory is often times problematic, so I think dask could help here as well.

TomAugspurger commented 5 years ago

Thanks Joseph. Based on your description, https://github.com/dask/dask-examples/issues/35 sounds quite similar to your workflow. It'd be nice if we could develop that into a fully-formed example.

On Thu, Jan 17, 2019 at 12:29 PM Joseph McGlinchy notifications@github.com wrote:

I also use primarily remote sensing data, but my use case is more on the model inference stage. Suppose you have already trained a model to your satisfaction. This is typically done with smaller samples, which absolutely could be supported by dask using a windowed approach. Once the model is trained I would like to apply it to a remote sensing dataset which is typically quite large, think 10k's of rows and columns. Loading that full dataset into memory is often times problematic, so I think dask could help here as well.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ml/issues/268#issuecomment-455278148, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHIo2MQ5OQimUMuaXHhXBgdJs6UN4eks5vEMEBgaJpZM4VAn0f .

stsievert commented 5 years ago

https://github.com/dask/distributed/issues/2581 is on training a PyTorch model with Dask.

AlbertDeFusco commented 5 years ago

I've made progress using a Dask DataFrame with the Keras .fit_generator() method to mimic dask_ml.wrappers.Incremental for Scikit-Learn.

https://anaconda.org/defusco/keras-dask/notebook

I'm close to getting dask_ml.wrappers.ParallelPostfit, and keras.wrappers.scikit_learn.KerasClassifier working with sklearn piplines.

TomAugspurger commented 5 years ago

Thanks Albert, looks interesting. I think providing a DaskGenerator, or at least documentation on how to write one, would be very useful.

A couple questions

  1. With your DaskGenerator, if I do gen = DaskGenerator(X, y); gen[0]; gen[1], then I think that you'll end up redoing a bunch of computation. Does that sound correct? In this example I think you would end up refitting the entire StandardScaler, splitting, etc. The solution is to persist the transformed data, either in memory on the cluster, perhaps as part of DaskGenerator.init, or on disk. And then the data passed to DaskGenerator would be loaded from disk. (LMK if I'm not making sense)
  2. Are you using distributed at all? It doesn't look like it. I ask because I've never gotten Keras / Tensorflow to work properly in multiple processes.

On Wed, Jun 12, 2019 at 12:10 PM Albert DeFusco notifications@github.com wrote:

I've made progress using a Dask DataFrame with the Keras .fit_generator() method to mimic Incremental for Scikit-Learn.

https://anaconda.org/defusco/keras-dask/notebook

I'm close to getting dask_ml.wrappers.ParallelPostfit, and keras.wrappers.scikit_learn.KerasClassifier() working with sklearn piplines.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ml/issues/268?email_source=notifications&email_token=AAKAOIQVSNPGVSZXW5LRCNTP2EUXPA5CNFSM4FICPUP2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODXRETQI#issuecomment-501369281, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKAOIRICB3JDRFN7LA7353P2EUXPANCNFSM4FICPUPQ .

AlbertDeFusco commented 5 years ago

@TomAugspurger ,

  1. Yes, it would be good to persist (if possible) any data given to DaskGenerator.

  2. No, I have not attempted this with Distributed.

CMCDragonkai commented 5 years ago

I was thinking of using Dask-ML to replace the multi-gpu model that exists Keras. So than we have a dask worker per-GPU instead of per-core as it is usually.

lesteve commented 4 years ago

Not sure what you are trying to do exactly and I am not an expert on Keras so I just want to comment on this part:

So than we have a dask worker per-GPU instead of per-core as it is usually.

There is a way to have a dask-worker per-GPU setting CUDA_VISIBLE_DEVICES environment variable in the worker processes and people have been doing it for some time (for example here is a Scipy 2016 video about combining dask and numba). A more recent approach is https://github.com/rapidsai/dask-cuda.

CMCDragonkai commented 4 years ago

Hmm, it's a bit more complicated than that. We would also need a parameter server in the CPU land so that it accumulates weight updates from all the models. But that would make dask-ml a full ML framework akin to Keras, and instead of going dask -> keras -> tensorflow, it would be dask -> tensorflow.

skeller88 commented 4 years ago

My professional experience in which I believe daks-ml would be great.

I work with remote sensing data. If you are thinking of Google Maps that is only part of the story. Going to the point, we have images of several gigabytes (tens of thousands of pixel, tens of channels) acquired tens times each year. Thanks to NASA and ESA (European Space Agency), we have 4 great pools of images available for free: MODIS, LANDSAT-8, Sentinel-1 and Sentinel-2.

Now, I am facing the issue to train some deep network on these data. They are all available on AWS (https://registry.opendata.aws/?search=satellite%20imagery), so it makes perfect sense to avoid the downloading and train the model directly on the cloud. As features we use bands from one (or more!) datasource, but also time must be taken into account in some way (this not something dask can help with, but it could somehow affect the design).

The training is usually supervised, the target being classification (water, urban, grassland, cropland, ...). We are also interested in 3 main kind of classification:

  1. Pure pixel based: each pixel is assigned to one class, without knowing where it is located
  2. Pixel based with neighborhood: this is where CNN come to play, because we want to exploit information coming from near pixel in order to estimate some spatial features
  3. Object based: we aggregate pixels based on some vector data (known a priori)

Please, take into account that we have a lot of data, and that data storage usually costs more than processing. So, in some cases it may be preferable to perform data preprocessing/normalization at each batch, in some other cases we may prefer to cache them (instance based). For the same reason, along and across epochs we would like to minimize data movement, but yet mix batches coming from different images in most possible ways.

Sorry for the long post, I hope to have been clear enough about my use case.

@corrado9999 , have you developed a good ml data preparation workflow since your question? I have a similar project I'm working on.

The problem I see is that batches are generated randomly, and so that requires random selections of the dataset. As long as the dataset is not in memory, selections will be slow. But I think https://www.tensorflow.org/guide/data_performance provides ways around that with prefetching.

corrado9999 commented 4 years ago

@skeller88 , no, unfortunately I have not worked on this (yet).

The problem I see is that batches are generated randomly, and so that requires random selections of the dataset. As long as the dataset is not in memory, selections will be slow.

Sure we need a pipeline, in order to perform such slow actions while the model is training. The TF API you pointed out is actually very interesting, not sure how it could be integrated in this framework though.

skeller88 commented 4 years ago

Gotcha. I ended up using the tensorflow datasets API directly. It has batch prefetching, which makes performance a bit better. See my code here.

stsievert commented 4 years ago

One straightforward integration would be hooking in Tensorboards HParams dashboard to visualize how well different parameters performed. Here’s a description of basic usage: https://www.tensorflow.org/tensorboard/hyperparameter_tuning_with_hparams

I think this would amount to writing logs in a specific format. Here’s how to get some example logs, and run tensorboard on those logs (code pulled from the post above):

$ wget -q 'https://storage.googleapis.com/download.tensorflow.org/tensorboard/hparams_demo_logs.zip'
$ unzip -q hparams_demo_logs.zip -d logs/hparam_demo
$ tensorboard --logdir logs/hparam_demo
stsievert commented 3 years ago

Dask clusters can now be used with PyTorch's distributed framework, thanks to the work of Saturn Cloud at https://github.com/saturncloud/dask-pytorch-ddp. This allows use of a Dask cluster with PyTorch's distributed framework (see "Getting Started with Distributed Data Parallel" for an example). It's similar to the (now archived) dask-tensorflow. From their README,

dask-pytorch-ddp is a Python package that makes it easy to train PyTorch models on Dask clusters using distributed data parallel. The intended scope of the project is

  • bootstrapping PyTorch workers on top of a Dask cluster
  • using distributed data stores (e.g., S3) as normal PyTorch datasets
  • mechanisms for tracking and logging intermediate results, training statistics, and checkpoints.

cc @hhuuggoo and @skirmer.