dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 715 forks source link

Using dask with PyTorch (train a model) #2581

Closed muammar closed 2 years ago

muammar commented 5 years ago

I find Dask very useful, and would like to use it to accelerate some pytorch models I am working on. A function to be accelerated would be the forward() function. I started doing some tests, but I get errors like these:

  File "/home/muammar/.local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 81, in serialization_error_loads
    raise TypeError(msg)
TypeError: Could not serialize object of type Tensor.
Traceback (most recent call last):
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 144, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 38, in dask_dumps
    header, frames = dumps(x)
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed/protocol/torch.py", line 10, in serialize_torch_Tensor
    header, frames = serialize(t.numpy())
RuntimeError: Can't call numpy() on Variable that requires grad. Use var.detach().numpy() instead.

If I detach the variable, then pytorch does not work with numpy arrays. My question would be -- what is the state of pytorch support? If it is of any help, I would be more than happy to try any code (or even write something if it is under my capabilities) because it would be very useful to have this support in Dask. Thanks for this great python module.

mrocklin commented 5 years ago

My question would be -- what is the state of pytorch support?

It's been as supported as people have made support :) I'm not sure anyone has seen the issue that you've raised here. If you're able to isolate it that would help. I encourage you to find some minimal reproducible example that reproduces the error above. Ideally this looks like the following:

from distributed.protocol import serialize, deserialize

t = # make some pytorch tensor
deserialize(*serialize(t))

That works for most pytorch tensors that we've seen so far (see tests in distributed/protocol/torch.py), so presumably your tensors are different in some way. It would be good to find out how.

muammar commented 5 years ago

Thanks for your reply. I should have inspected in more details the distributed.protocol modules.

It's been as supported as people have made support :)

Yes, this is true :).

I'm not sure anyone has seen the issue that you've raised here. If you're able to isolate it that would help. I encourage you to find some minimal reproducible example that reproduces the error above. Ideally this looks like the following:

I can reproduce the error above with this minimal script:

import torch
import numpy as np
from distributed.protocol import serialize, deserialize

a = np.random.rand(8, 1)

t = torch.tensor(a, requires_grad=True, dtype=torch.float)

deserialize(*serialize(t))

Tensors going in forward() usually have requires_grad=True to make pytorch keep track of them when computing gradients to perform backward propagation for neural networks. Changing requires_grad=False makes dask's serialize, deserialize mechanisms to work. How could it be possible to serialize/deserialize a tensor that has requires_grad=True?

muammar commented 5 years ago

I was inspecting this: https://github.com/dask/distributed/blob/177dfb891089cc872bab34fb8369d75cae13bc0a/distributed/protocol/torch.py I clearly see that tensors with gradients should be taken care of. I will install dask from sources to do some printing and understand better why this happens.

Just for the record:

In [3]: torch.__version__                                                       
Out[3]: '1.0.1.post2'
mrocklin commented 5 years ago

Thanks for the minimal script. That's very helpful to have, even if we can't find a good fix soon.

Yes, I was just about to point you to that file and ask if you had time to investigate. Glad to see that you're a step ahead of me :)

muammar commented 5 years ago

Thanks for being so responsive. Yes, I will go ahead and look into it. I hope I can make it work.

muammar commented 5 years ago

This seems to fix it:

diff --git a/distributed/protocol/torch.py b/distributed/protocol/torch.py
index c25b1549..e31604bc 100644
--- a/distributed/protocol/torch.py
+++ b/distributed/protocol/torch.py
@@ -7,7 +7,7 @@ import numpy as np

 @dask_serialize.register(torch.Tensor)
 def serialize_torch_Tensor(t):
-    header, frames = serialize(t.numpy())
+    header, frames = serialize(t.detach().numpy())
     if t.grad is not None:
         grad_header, grad_frames = serialize(t.grad.numpy())
         header['grad'] = {'header': grad_header, 'start': len(frames)}

Should I prepare a PR for this?

mrocklin commented 5 years ago

Sure! Do we know if this has any negative impact, for example on performance?

cc @stsievert

muammar commented 5 years ago

Another thing to understand is -- what detach() is actually doing. Like I wonder if this would affect the graphs and perform pytorch computations correctly. I will try to do some experiments about that. Regarding performance, I don't know exactly how that would be affected.

stsievert commented 5 years ago

Regarding performance, I don't know exactly how that would be affected.

I don't see how performance would be effected either. Well, not re-attaching the gradient would provide faster computation but would return an incorrect result because the gradients wouldn't be recorded.

[how] detach ... affect the graphs and perform pytorch computations correctly.

I think a better method would be to call the requires_grad_ function on the Tensor:

x = torch.tensor([0, 0, 0], requires_grad=True, dtype=torch.float32)
print(x)  # tensor([0., 0., 0.], requires_grad=True)
x.detach_()
print(x)  # tensor([0., 0., 0.])
x.requires_grad_()
print(x)  # tensor([0., 0., 0.], requires_grad=True)

These return self, so x is x.detach_().

muammar commented 5 years ago

@stsievert I was finishing reading this https://pytorch.org/docs/stable/autograd.html#torch.Tensor.detach and I think I agree with you.

To serialize we should .detach_() instead of .detach().

diff --git a/distributed/protocol/torch.py b/distributed/protocol/torch.py
index c25b1549..46c93812 100644
--- a/distributed/protocol/torch.py
+++ b/distributed/protocol/torch.py
@@ -1,5 +1,5 @@
 from .serialize import (serialize, dask_serialize, dask_deserialize,
-        register_generic)
+                        register_generic)

 import torch
 import numpy as np
@@ -7,7 +7,7 @@ import numpy as np

 @dask_serialize.register(torch.Tensor)
 def serialize_torch_Tensor(t):
-    header, frames = serialize(t.numpy())
+    header, frames = serialize(t.detach_().numpy())
     if t.grad is not None:
         grad_header, grad_frames = serialize(t.grad.numpy())
         header['grad'] = {'header': grad_header, 'start': len(frames)}
@@ -22,7 +22,8 @@ def deserialize_torch_Tensor(header, frames):
     if header.get('grad', False):
         i = header['grad']['start']
         frames, grad_frames = frames[:i], frames[i:]
-        grad = dask_deserialize.dispatch(np.ndarray)(header['grad']['header'], grad_frames)
+        grad = dask_deserialize.dispatch(np.ndarray)(header['grad']['header'],
+                                                     grad_frames)
     else:
         grad = None

Now, in deserialize_torch_Tensor I see .requires_grad_ is already being used when device == 'cpu' or we set requires_grad=header['requires_grad'] otherwise.

I will go ahead and prepare a PR where .detach_() is used to create a leaf.

muammar commented 5 years ago

After applying https://github.com/dask/distributed/pull/2586, I am able to use dask.delayed in the forward() function of my package. However, the pytorch computational graph breaks (note that loss function and metric error are constant) :

Screenshot from 2019-03-29 14-08-46

I am investigating why that is. Basically, the autograd is not tracking the gradient of the outputs.

stsievert commented 5 years ago

How are you performing the distributed optimization? I'm familiar with two choices:

What does your optimization code look like?

muammar commented 5 years ago

In the picture above, I was doing something very naive -- I just abstracted from forward() a function that I decorated as @dask.delayed. It seems to me that the pytorch optimizer(?) is not aware of what is going on.

I also did another naive attempt that would be related to training in batches (using chunks). I just stashed the changes that created the problem above, and I decorated forward() as @dask.delayed and using just one batch, I was able to see that the computations returned from forward() for that batch seemed correct but loss function does not change at each epoch because again either autograd or the optimizer do not track them :(. I have to look at dask_ml.Incremental to understand how the optimizer can be transferred.

I had my project public, but in the national lab where I work, I was told I have to first discuss the terms of licensing and those stuff before putting it public again. However, I have something in this gist so that you have an idea!

I never heard about using a parameter server! that sounds interesting. I will read it. Thank you very much for these discussions, I really appreciate.

muammar commented 5 years ago

@stsievert today I stumbled into this:

Pytorch's DataParallel

The picture above is a schematic representation of what DataParallel does in Pytorch. I changed my code so that I created one chunk containing all data just to see if dask.compute(*chunk) could do training with it. I instantiated the Pytorch model, and created a train() function (independent from the pytorch class) that is marked as delayed. When I call .compute over that batch, the training works! I think that it might be totally possible to do a distributed training if I change my code further following what is explained here.

The structure would be:

  1. Create a list with batches of data points that I need for training the model.
  2. A delayed train function that operates over the batches.
  3. Accumulate the gradients returned by the delayed functions.
  4. Call optimizer.step().

Does this make sense? I guess that doing this could make training ~n times faster.

stsievert commented 5 years ago

Yup, that makes sense. I think the image is describing a simple parameter server. One worker (GPU-1) deploys the model to every other worker, gathers all the gradients, and then takes an optimization step (then repeats).

What does your train function look like?

muammar commented 5 years ago

What does your train function look like?

This is how it looks like without dask after I have changed it to use batches:

def train(inputs, targets, model=None, data=None, optimizer=None, lr=None,
          weight_decay=None, regularization=None, epochs=100, convergence=None,
          lossfxn=None, device='cpu', n_batches=None):
    """Train the model

    Parameters
    ----------
    inputs : dict
        Dictionary with hashed feature space.
    epochs : int
        Number of full training cycles.
    targets : list
        The expected values that the model has to learn aka y.
    model : object
        The NeuralNetwork class.
    data : object
        DataSet object created from the handler.
    lr : float
        Learning rate.
    weight_decay : float
        Weight decay passed to the optimizer. Default is 0.
    regularization : float
        This is the L2 regularization. It is not the same as weight decay.
    convergence : dict
        Instead of using epochs, users can set a convergence criterion.
    lossfxn : obj
        A loss function object.
    device : str
        Calculation can be run in the cpu or cuda (gpu).
    n_batches : int
        Number of batches to use for training. Default is None.
    """

    initial_time = time.time()

    # old_state_dict = {}

    # for key in model.state_dict():
    #     old_state_dict[key] = model.state_dict()[key].clone()

    atoms_per_image = data.atoms_per_image

    n_batches = 5

    # If user requires chunks, we do them all here.
    if isinstance(n_batches, int):
        chunks = list(get_chunks(inputs, n_batches, svm=False))
        targets = list(get_chunks(targets, n_batches, svm=False))
        atoms_per_image = list(get_chunks(atoms_per_image, n_batches,
                                          svm=False))

    atoms_per_image = torch.tensor(atoms_per_image, requires_grad=False,
                                   dtype=torch.float)
    targets = torch.tensor(targets, requires_grad=False)

    if device == 'cuda':
        logger.info('Moving data to CUDA...')

        atoms_per_image = atoms_per_image.cuda()
        targets = targets.cuda()
        _inputs = OrderedDict()
        for hash, f in inputs.items():
            _inputs[hash] = []
            for features in f:
                symbol, vector = features
                _inputs[hash].append((symbol, vector.cuda()))

        inputs = _inputs

        move_time = time.time() - initial_time
        h, m, s = convert_elapsed_time(move_time)
        logger.info('Data moved to GPU in {} hours {} minutes {:.2f} seconds.'
                    .format(h, m, s))

    # Define optimizer
    if optimizer is None:
        optimizer = torch.optim.Adam(model.parameters(), lr=lr,
                                     weight_decay=weight_decay)

    logger.info('{:6s} {:19s} {:12s} {:8s} {:8s}'.format(
                                                   'Epoch',
                                                   'Time Stamp',
                                                   'Loss',
                                                   'RMSE/img',
                                                   'RMSE/atom'))
    logger.info('{:6s} {:19s} {:12s} {:8s} {:8s}'.format(
                                                   '------',
                                                   '-------------------',
                                                   '------------',
                                                   '--------',
                                                   '---------'))

    _loss = []
    _rmse = []
    epoch = 0

    while True:
        epoch += 1
        optimizer.zero_grad()  # clear previous gradients

        if n_batches is None:
            # If no batches are given.
            outputs, loss = train_batches(inputs, targets, model, optimizer,
                                          lossfxn, atoms_per_image, device)
        else:
            losses = []
            outputs_ = []
            for index, chunk in enumerate(chunks):
                outputs, loss = train_batches(index, chunk, targets, model,
                                              optimizer, lossfxn,
                                              atoms_per_image, device)
                losses.append(loss)
                outputs_.append(outputs)

        loss = sum(losses)

        loss.backward()
        optimizer.step()
        # RMSE per image and per/atom
        rmse = []
        rmse_atom = []
        for index, chunk in enumerate(outputs_):
            rmse.append(torch.sqrt(torch.mean((chunk - targets[index]).pow(2))).item())

            # RMSE per atom
            atoms_per_image_ = atoms_per_image[index]
            outputs_atom = chunk / atoms_per_image_
            targets_atom = targets[index] / atoms_per_image_
            rmse_atom.append(torch.sqrt(torch.mean((outputs_atom - targets_atom).pow(2))).item())

        rmse = sum(rmse)
        rmse_atom = sum(rmse_atom)

        _loss.append(loss.item())
        _rmse.append(rmse)

        ts = time.time()
        ts = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d '
                                                          '%H:%M:%S')
        logger.info('{:6d} {} {:8e} {:8f} {:8f}' .format(epoch, ts, loss, rmse,
                                                         rmse_atom))

        if convergence is None and epoch == epochs:
            break
        elif (convergence is not None and rmse < convergence['energy']):
            break

    training_time = time.time() - initial_time

    h, m, s = convert_elapsed_time(training_time)
    logger.info('Training finished in {} hours {} minutes {:.2f} seconds.'
                .format(h, m, s))
    logger.info('outputs')
    logger.info(outputs)
    logger.info('targets')
    logger.info(targets)

    import matplotlib.pyplot as plt
    plt.plot(list(range(epoch)), _loss, label='loss')
    plt.plot(list(range(epoch)), _rmse, label='rmse / image')
    plt.legend(loc='upper left')
    plt.show()

    parity(outputs.detach().numpy(), targets.detach().numpy())

def train_batches(index, chunk, targets, model, optimizer, lossfxn,
                  atoms_per_image, device):
    """A function that allows training per batches"""
    inputs = OrderedDict(chunk)
    outputs = model(inputs)

    if lossfxn is None:
        loss = MSELoss(outputs, targets[index], optimizer,
                       atoms_per_image[index], device=device)
    else:
        raise('I do not know what to do')

    return outputs, loss

What I am trying to do is to add train_batches()) (see below) as delayed after thewhile True:` (where training starts):

outputs, loss = train_batches(index, chunk, targets, model,
                                              optimizer, lossfxn,
                                              atoms_per_image, device)

When I use dask the loss function does not change. What could be wrong?

muammar commented 5 years ago

To be more explicit, this is how I adapt the code above with Dask.

diff --git a/mlchem/models/neuralnetwork.py b/mlchem/models/neuralnetwork.py
index accca5a..4fbac37 100644
--- a/mlchem/models/neuralnetwork.py
+++ b/mlchem/models/neuralnetwork.py
@@ -276,14 +276,24 @@ def train(inputs, targets, model=None, data=None, optimizer=None, lr=None,
         else:
             losses = []
             outputs_ = []
+
+            computations = []
             for index, chunk in enumerate(chunks):
-                outputs, loss = train_batches(index, chunk, targets, model,
-                                              optimizer, lossfxn,
-                                              atoms_per_image, device)
-                losses.append(loss)
-                outputs_.append(outputs)
+                computations.append(train_batches(index, chunk, targets, model,
+                                                  optimizer, lossfxn,
+                                                  atoms_per_image, device))
+                #losses.append(loss)
+                #outputs_.append(outputs)
+
+            results = dask.compute(*computations, scheduler='distributed')
+

+
+        for result in results:
+            losses.append(result[0])
+            outputs_.append(result[1])
+
         loss = sum(losses)

         loss.backward()
@@ -291,6 +301,7 @@ def train(inputs, targets, model=None, data=None, optimizer=None, lr=None,
         # RMSE per image and per/atom
         rmse = []
         rmse_atom = []
+
         for index, chunk in enumerate(outputs_):
             rmse.append(torch.sqrt(torch.mean((chunk - targets[index]).pow(2))).item())

@@ -336,6 +347,7 @@ def train(inputs, targets, model=None, data=None, optimizer=None, lr=None,
     parity(outputs.detach().numpy(), targets.detach().numpy())

+@dask.delayed
 def train_batches(index, chunk, targets, model, optimizer, lossfxn,
                   atoms_per_image, device):
     """A function that allows training per batches"""
@@ -348,4 +360,4 @@ def train_batches(index, chunk, targets, model, optimizer, lossfxn,
     else:
         raise('I do not know what to do')

-    return outputs, loss
+    return loss, outputs
stsievert commented 5 years ago

When I use dask the loss function does not change. What could be wrong?

What do you mean "the loss function does not change"?

I think I'd expect the gradients to be calculated on each worker, not the master node. That's the main computational bottleneck; it takes about 2 or 3 times longer than the forward pass. I'd expect something like an image mentioned later in the post:

Screen Shot 2019-04-02 at 9 24 43 AM

This way GPU-1 does a lot less of the work. Here's the code I'd expect for that:

@dask.delayed
def calculate_gradients(model, x, y):
    predictions = model.forward(x)
    loss = MSELoss(predictions, y)
    loss.backwards()
    return model.grad

model = Net(...)
optimizer = SGD(model.parameters())
while not converged:
    futures = [calculate_gradients(batch) for batch in batches]
    grads = dask.compute(*futures)
    # copy grads to model.parameters
    optimizer.step()
muammar commented 5 years ago

When I use dask the loss function does not change. What could be wrong?

What do you mean "the loss function does not change"?

I mean that forward propagation is working, but calling loss.backward() and optimizer.step() is not updating the parameters giving as a consequence a loss function that does not decrease (nor increase) with each epoch.

I think I'd expect the gradients to be calculated on each worker, not the master node. That's the main computational bottleneck; it takes about 2 or 3 times longer than the forward pass. I'd expect something like an image mentioned later in the post:

I think you are right with this statement. I still am not very familiar with distributed computing but am learning a lot with dask and these discussions.

Screen Shot 2019-04-02 at 9 24 43 AM

This way GPU-1 does a lot less of the work. Here's the code I'd expect for that:

@dask.delayed
def calculate_gradients(model, x, y):
    predictions = model.forward(x)
    loss = MSELoss(predictions, y)
    loss.backwards()
    return model.grad

model = Net(...)
optimizer = SGD(model.parameters())
while not converged:
    futures = [calculate_gradients(batch) for batch in batches]
    grads = dask.compute(*futures)
    # copy grads to model.parameters
    optimizer.step()

Let me try to implement this and will get back to this thread. Thanks for taking the time to reply to these rookie questions :).

muammar commented 5 years ago

@stsievert it seems to be workingggg!

works

I will do some more experiments and tests and will write back!

Edit: Will it be better to use futures instead of delayed for this case?

muammar commented 5 years ago

Sorry for all the noise. It seems that using futures is way faster. I did the following:

  1. Removed @dask.delayed from the calculate_gradients function.
  2. Use dask.distributed.get_client() to get the current client.
  3. For each batch, call calculate_gradients with client.submit and append those futures in a list called accumulation.
  4. Wait for the computations dask.distributed.wait(accumulation).
  5. Gather them accumulation = client.gather(accumulation).

I wonder -- is this optimal thing to do in practice?

stsievert commented 5 years ago

using futures is way faster

Hm... I'd expect Dask and Dask Distributed to be equally fast in this use case because the graph is simple.

I'm guessing you're getting a warning on the scheduler about "large object of size X detected in the task graph", which likely refers to the train data or the model. I think I'd do something like

def calculate_grads(X, Y, model=None):
    out = model.forward(X)
    loss = ...
    loss.backwards()
    return model.grad

# train data: (X, Y)
X_futures = [client.scatter(x) for x in X]  # or chunks in your case
y_futures = [client.scatter(y) for y in Y]  # or index in your case I think

grad_futures = client.map(calculate_grads, X_futures, Y_futures, model=model)
# or client.submit and a for-loop in your case
muammar commented 5 years ago

using futures is way faster

Hm... I'd expect Dask and Dask Distributed to be equally fast in this use case because the graph is simple.

Thanks for your reply. In this case, I get 1 more epoch per second when I use futures over delayed:

 # Futures
 8529 2019-04-03 14:13:27 1.208903e-03 0.643312 0.033830
 8530 2019-04-03 14:13:28 1.210656e-03 0.644287 0.033891
 8531 2019-04-03 14:13:29 1.204191e-03 0.642115 0.033751
 8532 2019-04-03 14:13:29 1.208181e-03 0.643377 0.033854
 8533 2019-04-03 14:13:29 1.207316e-03 0.642983 0.033796
 8534 2019-04-03 14:13:30 1.208588e-03 0.643498 0.033872
 8535 2019-04-03 14:13:30 1.204606e-03 0.641842 0.033747
 8536 2019-04-03 14:13:31 1.202783e-03 0.642535 0.033790
 8537 2019-04-03 14:13:31 1.201063e-03 0.640601 0.033690
 8538 2019-04-03 14:13:31 1.197524e-03 0.641050 0.033691

# Delayed
  318 2019-04-03 14:19:00 2.663904e-01 9.697880 0.510414
  319 2019-04-03 14:19:01 2.661483e-01 9.693430 0.510178
  320 2019-04-03 14:19:01 2.660170e-01 9.691009 0.510051
  321 2019-04-03 14:19:02 2.657659e-01 9.685952 0.509802
  322 2019-04-03 14:19:03 2.655579e-01 9.682105 0.509597
  323 2019-04-03 14:19:03 2.653371e-01 9.678014 0.509378
  324 2019-04-03 14:19:04 2.651287e-01 9.674046 0.509185
  325 2019-04-03 14:19:05 2.649343e-01 9.670558 0.508991
  326 2019-04-03 14:19:06 2.647445e-01 9.667180 0.508811
  327 2019-04-03 14:19:06 2.645498e-01 9.663593 0.508629

Is futures using all resources I specify with cluster = LocalCluster(n_workers=16, threads_per_worker=2) and client = Client(cluster)? I think I am confused about futures and delayed. With delayed you create this list of delayed functions and you call dask.compute(list, scheduler='distributed') however, with futures one just does client.submit(). I would say that even though with futures one does not declare the scheduler, both are working in a "distributed manner" because they should in principle use all resources I declared on my client above. Is that correct?

I'm guessing you're getting a warning on the scheduler about "large object of size X detected in the task graph", which likely refers to the train data or the model. I think I'd do something like

I am getting these warnings when doing other models like kernel ridge.

def calculate_grads(X, Y, model=None):
    out = model.forward(X)
    loss = ...
    loss.backwards()
    return model.grad

# train data: (X, Y)
X_futures = [client.scatter(x) for x in X]  # or chunks in your case
y_futures = [client.scatter(y) for y in Y]  # or index in your case I think

grad_futures = client.map(calculate_grads, X_futures, Y_futures, model=model)
# or client.submit and a for-loop in your case

Thanks for this snippet! I was wondering how to use client.scatter.

stsievert commented 5 years ago

Your question rephrased:

Are Dask and Dask Distributed using the same resources if I create a Client?

Depends on the configuration and the computation being performed. https://docs.dask.org/en/latest/setup/single-distributed.html By default, a threaded scheduler will be created. If a distributed scheduler is created with Client, it uses processes by default (which means there are some complications with data sharing).

That documentation page also explicitly mentions

For different computations you may find better performance with particular scheduler settings. This document helps you understand how to choose between and configure different schedulers, and provides guidelines on when one might be more appropriate.

I only considered the computational graph in https://github.com/dask/distributed/issues/2581#issuecomment-479650294. I should have considered more: the data and computation.

I am getting these warnings when doing other models like kernel ridge.

I would suspect your model is too large but not the training data.

muammar commented 5 years ago

@stsievert yesterday, after your reply, I went again over the distributed documentation. It is clear to me now that futures are using whatever client I define (as you wrote). I always use dask distributed clients and therefore all futures are going to be working with processes. That matches with the things I see in the dashboard.

So far the approach described in this issue seems to be working well. The last remaining problem I am trying to solve has to do with the use of submit or .compute within forward(). When using one of those inside forward(), that makes loss.backward() to return None gradients. I reported that here. In the code I am working, forward() receives a dictionary. I think that if I would use .submit for each iteration in the dict, then any resource that is idle should get a task to be done. I don't know if that is correct but would like to give it a try.

muammar commented 5 years ago

using futures is way faster

Hm... I'd expect Dask and Dask Distributed to be equally fast in this use case because the graph is simple.

I'm guessing you're getting a warning on the scheduler about "large object of size X detected in the task graph", which likely refers to the train data or the model. I think I'd do something like

def calculate_grads(X, Y, model=None):
    out = model.forward(X)
    loss = ...
    loss.backwards()
    return model.grad

# train data: (X, Y)
X_futures = [client.scatter(x) for x in X]  # or chunks in your case
y_futures = [client.scatter(y) for y in Y]  # or index in your case I think

grad_futures = client.map(calculate_grads, X_futures, Y_futures, model=model)
# or client.submit and a for-loop in your case

You are right about this. I am trying to implement client.scatter and even with the changes done in the PR I submitted I am getting this error:

  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed-1.26.1+11.g668e872a-py3.7.egg/distributed/comm/utils.py", line 50, in to_frames
    res = yield offload(_to_frames)
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed-1.26.1+11.g668e872a-py3.7.egg/distributed/comm/utils.py", line 43, in _to_frames
    context=context))
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed-1.26.1+11.g668e872a-py3.7.egg/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed-1.26.1+11.g668e872a-py3.7.egg/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed-1.26.1+11.g668e872a-py3.7.egg/distributed/protocol/serialize.py", line 163, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type Tensor.', 'tensor([-14.5868730545, -14.5640010834, -14.5292606354, -14.4874134064,\n        -14.4443883896, -14.4063940048, -14.3788156509, -14.3651800156,\n        -14.3665180206, -14.3813629150])')

I am reading the source code to understand better this problem.

stsievert commented 5 years ago

The last remaining problem I am trying to solve has to do with the use of submit or .compute within forward(). When using one of those inside forward(), that makes loss.backward() to return None gradients.

What value does using the client inside forward provide? I think the client operates on one node in the computational graph, so I think I expect None gradients. Can the model run forward on a single machine/worker? I'd expect that to work best.

muammar commented 5 years ago

The last remaining problem I am trying to solve has to do with the use of submit or .compute within forward(). When using one of those inside forward(), that makes loss.backward() to return None gradients.

What value does using the client inside forward provide? I think the client operates on one node in the computational graph, so I think I expect None gradients. Can the model run forward on a single machine/worker? I'd expect that to work best.

I created this post in Discuss Pytorch with a small working example. The very same error I get in that toy example is what I get when I implement it on my package :(

After what you wrote I stumbled into this. I quote from Pieter Noordhuis:

The DataParallel module is pretty straightforward: it splits the input up into N chunks (typically across the first dimension), runs the same forward pass on N replicas of your model, and gathers the output back into a single tensor (across the same dimension as the input was split). Gradients are always accumulated in the source model (not the replicas).

In my case, I also suspect the gradients are not being accumulated in the source model.

muammar commented 5 years ago

using futures is way faster

Hm... I'd expect Dask and Dask Distributed to be equally fast in this use case because the graph is simple.

I'm guessing you're getting a warning on the scheduler about "large object of size X detected in the task graph", which likely refers to the train data or the model. I think I'd do something like

def calculate_grads(X, Y, model=None):
    out = model.forward(X)
    loss = ...
    loss.backwards()
    return model.grad

# train data: (X, Y)
X_futures = [client.scatter(x) for x in X]  # or chunks in your case
y_futures = [client.scatter(y) for y in Y]  # or index in your case I think

grad_futures = client.map(calculate_grads, X_futures, Y_futures, model=model)
# or client.submit and a for-loop in your case

I changed my training class to look just like this. However, when the model is too big, I still get the warning that the object is too large. In that case, should the model be scattered as well? Have you done that before? I did a couple of tries but that did not work for me.

mrocklin commented 5 years ago

Checking in. Has the original issue here been resolved?

Mmiglio commented 5 years ago

Hi all! This could be a stupid question, but why don't we use Dask to initialize distributed Pytorch processes? Pytorch comes with a nice distributed package, but you need manually to initialize the processes and eventually partition data between workers. My thought is that Dask can help with this. I would do it in the following way

from dask.distributed import Client
import torch.distributed as dist
import torch
import os

client = Client(local_dir="/tmp")

rank = [client.scatter(rank) for rank in range(2)]
world_size = [client.scatter(2) for _ in range(2)]
#x = [client.scatter(x) for x in np.split(data, 2)]

def run(rank, size, backend='gloo'):
    """
    Initialize the distributed environment
    """
    os.environ['MASTER_ADDR'] = "127.0.0.1" #if using multiple workers, set the address of the worker with rank = 0
    os.environ['MASTER_PORT'] = "23456"
    os.environ['WORLD_SIZE'] = str(size)
    os.environ['RANK'] = str(rank)

    ## Init distributed
    print("Waiting other workers...")
    dist.init_process_group(
        init_method="env://",
        backend='gloo'
    )

    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])
    return "rank %s"%rank

futures = client.map(run, rank, world_size)
print(client.gather(futures))

Now, instead of sending/receiving tensors you can train the model.
I probably did something stupid, this is the first time I'm using Dask :)

TomAugspurger commented 5 years ago

@Mmiglio that sounds somewhat similar to what we do with dask-tensorflow.

An example of how to use dask and pytorch together, which could perhaps live in dask-examples, would be welcome.

stsievert commented 5 years ago

@Mmiglio that's a good idea. I'd love to see it expanded.

muammar commented 5 years ago

@Mmiglio This might be a stupid question but -- what might be the advantage of the method you proposed with respect to the one that is discussed above? The issue I am getting is that if the model is too big, I get some dask warnings. Will your proposition get that fixed? I would appreciate any input.

Mmiglio commented 5 years ago

You can find an example here https://github.com/Mmiglio/alpacaTorch/blob/master/train.py Good thing is that it seems to work! This is the training loss for a bunch of epochs

Screenshot 2019-05-22 at 09 09 42

In the next few days I will try to do something less hardcoded and test other things. If anyone has any idea on how to improve the Dask part, ping me :)

@muammar Well, the method I proposed should be more scalable: If I understood correctly, with your method you are using Dask workers to compute gradients. These gradients are then aggregated by the master, computed new weights and then sent back to workers to update the model, am I right? If the answer is positive, then what you are doing is "implementing a single parameter server" (kind of) and this can be a bottleneck. The other method is using ring-all-reduce (implemented in distributedDataParallel in pytorch) to aggregate weight, hence no communication with the master node is involved. Also you are not using Dask to comunicate weight, because all the communications between workers are managed by Pytorch dist module, which is highly optimized. Concerning the model: this should fix it, because you are creating models inside workers, am I wrong? I not, consider that you are sending your model/weights to workers at most once, that is when you first instantiate the model. Please tell me if I said something really stupid, because this is very likely.

mrocklin commented 5 years ago

I think that using Dask to bootstrap PyTorch's distributed computing platform is likely a highly pragmatic approach. As @TomAugspurger this was done for tensorflow. It was also done for XGBoost in dask-xgboost, and is currently being upstreamed in https://github.com/dmlc/xgboost/pull/4473 .

Rather than use submit and map I encourage people to step a little bit outside of the task scheduling framework and use run to ensure that all workers participate and that they participate evenly. If someone wants to take this on I would encourage them to look at the existing solutions and then speak up early about early work. I think that people here will be happy to advise.

muammar commented 5 years ago

You can find an example here https://github.com/Mmiglio/alpacaTorch/blob/master/train.py Good thing is that it seems to work! This is the training loss for a bunch of epochs

Screenshot 2019-05-22 at 09 09 42

In the next few days I will try to do something less hardcoded and test other things. If anyone has any idea on how to improve the Dask part, ping me :)

@muammar Well, the method I proposed should be more scalable: If I understood correctly, with your method you are using Dask workers to compute gradients. These gradients are then aggregated by the master, computed new weights and then sent back to workers to update the model, am I right?

Just for the sake of clarification, what the method above does is:

  1. Split the data in chunks.
  2. Send to each worker a batch of data with the model.
  3. forward/backward passes in the workers, accumulate the gradients and return them to master.
  4. I use a client.gather to get them and sum them up.
  5. Update the gradient parameters in master.
  6. Finally .optimizer.step().
  7. Repeat until convergence.

If the answer is positive, then what you are doing is "implementing a single parameter server" (kind of) and this can be a bottleneck. The other method is using ring-all-reduce (implemented in distributedDataParallel in pytorch) to aggregate weight, hence no communication with the master node is involved. Also you are not using Dask to comunicate weight, because all the communications between workers are managed by Pytorch dist module, which is highly optimized. Concerning the model: this should fix it, because you are creating models inside workers, am I wrong? I not, consider that you are sending your model/weights to workers at most once, that is when you first instantiate the model. Please tell me if I said something really stupid, because this is very likely.

It makes sense. Let me know how I could help with this. I am working in a package for chemistry and materials science where I implemented the distributed training method discussed previously. I would be glad to implement this new approach and make tests but I might have some questions :). In this way, we could get something done to be added in dask distributed! What do you think?

By the way, will the diagnostic dashboard be able to show the status of things?

Mmiglio commented 5 years ago

I would love to create a package with this :)

By the way, will the diagnostic dashboard be able to show the status of things?

To be honest I don't know the answer, but tomorrow I will try and see if something happens.

stsievert commented 5 years ago

I would love to create a package with this :)

I'd love to see this too! This sounds pragmatically useful.

implementing a single parameter server" ... this can be a bottleneck. ... The other method is using ring-all-reduce

Depends on the problem. For large models, certainly. But wouldn't this be managed by PyTorch? All Dask would do is setup the network, right?

I have a draft implementation of the parameter server/ring-all-reduce schemes above for Dask at https://github.com/stsievert/dask-ps (though they're referred to as "centralized" and "decentralized" respectively, and also far too general).

By the way, will the diagnostic dashboard be able to show the status of things?

I think that'd be useful too (for train/test score, etc). In the decentralized/ring-all-reduce case I think it could be done (maybe with bit of delay) with the per-worker resource plugin

Mmiglio commented 5 years ago

But wouldn't this be managed by PyTorch? All Dask would do is setup the network, right?

Indeed with the method I proposed everything would be managed by PyTorch. As you said what Dask would do is set up the network and partition data.

I have a draft implementation of the parameter server/ring-all-reduce schemes above for Dask at https://github.com/stsievert/dask-ps (though they're referred to as "centralized" and "decentralized" respectively, and also far too general).

I like what you did here. Basically you are implementing torch.distributed using Dask, am I right? It would be great to compare the performance of this two approaches.

muammar commented 4 years ago

@stsievert and @mrocklin I am submitting a paper about a package I built for ML in atomistic simulations and as you helped me getting the PyTorch model working with dask, I would like to add you in the acknowledgment section. Is that ok with you? If you do not want me to add your name I understand. Best.

stsievert commented 4 years ago

I would like to add you in the acknowledgment section.

@muammar no objections here! Thank you.

mrocklin commented 4 years ago

Thank you for thinking of us @muammar . Are you able to share the article?

muammar commented 4 years ago

Thank you for thinking of us @muammar . Are you able to share the article?

I have shared it with you and @stsievert. You should have it in your emails.

sribkain commented 4 years ago

@muammar Can I get a copy too please?

muammar commented 4 years ago

@muammar Can I get a copy too please?

The preprint can be found here: https://chemrxiv.org/articles/ML4Chem_A_Machine_Learning_Package_for_Chemistry_and_Materials_Science/11952516 :)

GenevieveBuckley commented 2 years ago

The preprint can be found here: https://chemrxiv.org/articles/ML4Chem_A_Machine_Learning_Package_for_Chemistry_and_Materials_Science/11952516 :)

And @muammar's github repository with the ML4Chem library is here: https://github.com/muammar/ml4chem

@muammar - would you be interested in sharing this on the dask blog? We sometimes have posts on that sort of thing, and I think it'd be interesting for many people (here's a post on Dask with Pytorch Nick wrote earlier this year).

GenevieveBuckley commented 2 years ago

(I'm going to close this issue, since it seems you have it working well now, but that won't stop you being able to leave comments on the conversation here, fyi)