DiffEqML / torchdyn

A PyTorch library entirely dedicated to neural differential equations, implicit models and related numerical methods
https://torchdyn.org
Apache License 2.0
1.4k stars 130 forks source link

CPU Parallelization #115

Closed syndamn closed 2 years ago

syndamn commented 3 years ago

Hello, I was wondering if there was any way to do paralellization with cpu cores to train using this package? When I've tried to implement torch multiprocessing, it warns me about a fork and autograd conflict.

Zymrael commented 3 years ago

This all depends on what you'd like to do specifically. PyTorch already parallelizes ops across CPU cores afaik. Some stuff here. Are you trying to train different models across different CPU nodes?

Could you also share a minimal example?

syndamn commented 3 years ago

My goal is to using torch.multiprocessing to train a neural ODE with by passing multiple initial condition/trajectories to separate workers. I think I have worked around the autograd error, but I'm still new to multiprocessing and would definitely appreciate any feedback you have on if I'm doing it correct

Here is the example I am using for coding! Thank you in advance for the help In this, dyn is a custom nn.Module, Learner is a pl.Lightning Module

ctx = torch.multiprocessing.get_context('spawn') def Worker(ix,t_span,trainloader): torch.set_num_threads(1) print("Starting training for worker ", ix) model = NeuralODE(dyn, sensitivity='adjoint', solver='tsit5', interpolator=None, atol=1e-3, rtol=1e-3).to(device) nde = Learner(t_span,model) trainer = pl.Trainer(min_epochs=10, max_epochs=20) trainer.fit(nde,train_dataloader=trainloader) print("Finishing training for worker ", ix)

def overall_process(num_workers): workers = [] for ix in range(num_workers): worker = torch.multiprocessing.Process(target=Worker, args=(ix,t_span,trainloader )) workers.append(worker) [w.start() for w in workers]
for worker in workers: worker.join() print("Finished Training")
print(" ")

if name == 'main': start = time.time() overall_process(2) print("Time taken: ", time.time() - start) print(" ")

----------------Learner --------------------

class Learner(pl.LightningModule):
    def __init__(self, t_span:torch.Tensor, model:nn.Module):
        super().__init__()
        self.model, self.t_span = model, t_span

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x= batch[0][0]
        criterion =nn.MSELoss()
        t_eval, y_hat = self.model(x, self.t_span)
        y_hat = y_hat[1] # select last point of solution trajectory
        loss = criterion(y_hat, correct_pos_vel)
        return {'loss': loss}   

    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=0.01)

    def train_dataloader(self):
        return trainloader

------------------------ nn.module

class customDynamicsNet(torch.nn.Module): def init(self, D_in, H, tan_out, D_out): """ In the constructor we instantiate two nn.Linear modules and assign them as member variables. """ super(customDynamicsNet, self).init() self.linear1 = torch.nn.Linear(D_in, H) self.tanh = torch.nn.Tanh() self.linear2 = torch.nn.Linear(tan_out, D_out)

def forward(self, x):
    """
    In the forward function we accept a Tensor of input data and we must return
    a Tensor of output data. We can use Modules defined in the constructor as
    well as arbitrary operators on Tensors.
    """
    ## in: pos/vel # out: vel/accel
    x1=torch.cat((x[0:3]/7000,x[3:6]),0)
    h_relu = self.linear1(x)
    tanh = self.tanh(h_relu)
    y_pred = self.linear2(tanh)
    return torch.cat((y_pred[3:6], y_pred[0:3]/100),0)
Zymrael commented 3 years ago

Not sure what the use case would be, as all methods and classes here can already solve in parallel batches of initial conditions. Are you looking for more fine-grained control on which initial conditions go on which core?

syndamn commented 3 years ago

Yes! It would also be super helpful if you could explain how the parallel batching works. When using the process above, is the model training on three (or some number) separate initial conditions simultatneously and developing the weights for the model based on the combined loss of the multiple initial conditions? I just want to verify if it is possible to train the model using multiple ICs with each IC set having access to the learning results of the others.

Hopefully that makes sense, thank you for the help :)

Zymrael commented 3 years ago

Sure, the different ICs here represent your batch of data in a machine learning sense. So your tensor dimension dedicated to batching will carry forward your solutions for each IC. This means that if needed you can use these values anytime during the training procedure (to compute for example normalization statistics) or only at the end to compute the mean value of some target loss function.

CPU parallelization of data samples within a batch is not a torchdyn specific feature and is due to PyTorch. You can find plenty of in-depth information online if you need to dig into its implementation further.