christophmark / pytorch-autocomplete

Tutorial for developing a topic-specific autocomplete function for Jupyter notebooks based on PyTorch and Google Colaboratory.
MIT License
36 stars 5 forks source link

training is slow, is it possible to add multi-gpu ? #2

Open ghost opened 4 years ago

ghost commented 4 years ago

Hi,

Hope you are all well !

I tried to train pytorch autocomplete on a 200Mb file and it is quiet slow.

Is it possible to add the dataparallel function of pytorch or distributed parallel one to speed up the process ? and most of all, is there any particular reason why it is so slow ?

Ref. https://pytorch.org/tutorials/beginner/former_torchies/parallelism_tutorial.html

Thanks in advance for your reply on that topic.

Cheers, X

christophmark commented 4 years ago

Are you training on Google Colab or locally on your own machine? On Google Colab you need to make sure that GPU acceleration is activated. Still, training may take several hours.

ghost commented 4 years ago

On my local machine, with 3 gpus GeForce GTX 1080.

Is there something that can be done for speeding it so we can train it on larger datasets ?

Thanks in advance for your time.

ghost commented 4 years ago

That's what I have done so far

#!/usr/bin/env python
# coding: utf-8

import torch
print('torch.cuda.is_available',torch.cuda.is_available())

import io
import os
import re
import unidecode
import lxml.html
from time import sleep
import numpy as np

# minimal file size (bytes) to consider in code search
MINSIZE = 30000

# output file name to store all obtained data in
OUTNAME = 'autocompletion_papers'

# directory to store output file in
OUTDIR = './data'

with io.open(OUTDIR+'/'+OUTNAME+'.txt', mode="r", encoding="utf-8") as f:
    data = ''.join(f.readlines())

#with open(OUTDIR+'/'+OUTNAME+'.txt') as f:
#    data = ''.join(f.readlines())

print(data[:300])
print('\n--------------------------------------------------------------------------------\n')
print(data[100000:100300])
print('\n--------------------------------------------------------------------------------\n')
print(data[300000:300300])

from datetime import datetime
import numpy as np

import torch
from torch import nn
import torch.nn.functional as F
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.multiprocessing as mp
import torch.distributed as dist

def get_chars(filepath):
    """
    Returns list of all characters present in a file
    """
    with open(filepath, 'r') as f:
        data = f.read()

    chars = list(set(data))
    return chars

def load_data(filepath, chars=None):
    """
    Opens a data file, determines the set of characters present in the file and encodes the characters.
    """
    #with open(filepath, 'r') as f:
    #    data = f.read()
    with io.open(filepath, mode="r", encoding="utf-8") as f:
        data = f.read()

    if chars is None:
        chars = tuple(set(data))

    # lookup tables for encoding
    int2char = dict(enumerate(chars))
    char2int = {ch: ii for ii, ch in int2char.items()}

    # encoding
    encoded = np.array([char2int[ch] for ch in data])

    return chars, encoded

def one_hot_encode(arr, n_labels):
    """
    One-hot encoding for character-data
    """
    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)

    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.

    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))

    return one_hot

def get_batches(arr, n_seqs, n_steps):
    """
    Batch generator that returns mini-batches of size (n_seqs x n_steps)
    """
    batch_size = n_seqs * n_steps
    n_batches = len(arr) // batch_size

    # always create full batches
    arr = arr[:n_batches * batch_size]

    # reshape
    arr = arr.reshape((n_seqs, -1))

    for n in range(0, arr.shape[1], n_steps):
        # features (sequence of characters)
        x = arr[:, n:n + n_steps]

        # targets (the next character after the sequence)
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n + n_steps]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

class CharRNN(nn.Module):
    def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob=0.5):
        """
        Basic implementation of a multi-layer RNN with LSTM cells and Dropout.
        """
        super().__init__()
        # super(CharRNN, self).__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden

        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}

        self.dropout = nn.Dropout(drop_prob)
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers,
                            dropout=drop_prob, batch_first=True)
        self.fc = nn.Linear(n_hidden, len(self.chars))

    def forward(self, x, hidden):
        """
        Forward pass through the network
        """

        x, hidden = self.lstm(x, hidden)
        x = self.dropout(x)
        x = self.fc(x)

        return x, hidden

    def predict(self, char, hidden=None, device=torch.device('cpu'), top_k=None):
        """
        Given a character, predict the next character. Returns the predicted character and the hidden state.
        """
        with torch.no_grad():
            self.to(device)
            try:
                x = np.array([[self.char2int[char]]])
            except KeyError:
                return '', hidden

            x = one_hot_encode(x, len(self.chars))
            inputs = torch.from_numpy(x).to(device)

            out, hidden = self.forward(inputs, hidden)

            p = F.softmax(out, dim=2).data.to('cpu')

            if top_k is None:
                top_ch = np.arange(len(self.chars))
            else:
                p, top_ch = p.topk(top_k)
                top_ch = top_ch.numpy().squeeze()

            if top_k == 1:
                char = int(top_ch)
            else:
                p = p.numpy().squeeze()
                char = np.random.choice(top_ch, p=p / p.sum())

            return self.int2char[char], hidden

def save_checkpoint(net, opt, filename, train_history={}):
    """
    Save trained model to file.
    """
    checkpoint = {'n_hidden': net.module.n_hidden,
                  'n_layers': net.module.n_layers,
                  'state_dict': net.module.state_dict(),
                  'optimizer': opt.state_dict(),
                  'tokens': net.module.chars,
                  'train_history': train_history}

    with open(filename, 'wb') as f:
        torch.save(checkpoint, f)

def load_checkpoint(filename):
    """
    Load trained model from file.
    """
    with open(filename, 'rb') as f:
        checkpoint = torch.load(f, map_location='cpu')

    net = CharRNN(checkpoint['tokens'], n_hidden=checkpoint['n_hidden'], n_layers=checkpoint['n_layers'])
    net.load_state_dict(checkpoint['state_dict'])

    return net, checkpoint

os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'

def train(rank, world_size, net, data, epochs=10, n_seqs=10, n_steps=50, lr=0.001, clip=5, val_frac=0.1, device=torch.device('cpu'),
          name='checkpoint', early_stop=True, plot=False):

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    """
    Training loop.
    """
    net.train() # switch into training mode
    net.cuda()
    opt = torch.optim.Adam(net.parameters(), lr=lr) # initialize optimizer
    criterion = nn.CrossEntropyLoss() # initialize loss function

    # create training and validation data
    val_idx = int(len(data) * (1 - val_frac))
    data, val_data = data[:val_idx], data[val_idx:]

    # net = DDP(net)
    # net = nn.DistributedDataParallel(net, device_ids=[0,1,2])
    # net = torch.nn.DataParallel(net, device_ids=[0,1,2])
    # net = torch.nn.DataParallel(net)
    net = DDP(net, device_ids=[0,1,2])

    net.to(device) # move neural net to GPU/CPU memory

    min_val_loss = 10.**10 # initialize minimal validation loss
    train_history = {'epoch': [], 'step': [], 'loss': [], 'val_loss': []}

    n_chars = len(net.module.chars) # get size of vocabulary

    # main loop over training epochs
    for e in range(epochs):
        hidden = None # reste hidden state after each epoch

        # loop over batches
        for x, y in get_batches(data, n_seqs, n_steps):

            # encode data and create torch-tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x).to(device), torch.tensor(y, dtype=torch.long).to(device)

            # reset gradient information
            net.module.zero_grad()

            # generate network output
            output, hidden = net.module.forward(inputs, hidden)

            # compute loss
            loss = criterion(output.view(n_seqs * n_steps, n_chars), targets.view(n_seqs * n_steps))

            # compute gradients
            loss.backward()

            # gradient clipping to prevent exploding gradients
            nn.utils.clip_grad_norm_(net.module.parameters(), clip)

            # optmize
            opt.step()

            # prevent backpropagating through the entire training history
            # by detaching hidden state and cell state
            hidden = (hidden[0].detach(), hidden[1].detach())

        # validation step is done without tracking gradients
        with torch.no_grad():
            val_h = None
            val_losses = []

            for x, y in get_batches(val_data, n_seqs, n_steps):
                x = one_hot_encode(x, n_chars)
                inputs, targets = torch.from_numpy(x).to(device), torch.tensor(y, dtype=torch.long).to(device)

                output, val_h = net.module.forward(inputs, val_h)

                val_loss = criterion(output.view(n_seqs * n_steps, n_chars), targets.view(n_seqs * n_steps))
                val_losses.append(val_loss.item())

            # compute mean validation loss over batches
            mean_val_loss = np.mean(val_losses)

            # track progress
            train_history['epoch'].append(e+1)
            train_history['loss'].append(loss.item())
            train_history['val_loss'].append(mean_val_loss)

        # print training progress
        print("{}   Epoch: {:.0f}/{:.0f}   Loss: {:.4f}   Val Loss: {:.4f}".format(
            datetime.now().strftime('%H:%M:%S'),
            e+1, epochs,
            loss.item(),
            mean_val_loss))

        # save model checkpoint if validation loss has decreased
        if mean_val_loss < min_val_loss:
            save_checkpoint(net, opt, name+'.net', train_history=train_history)
            min_val_loss = mean_val_loss

        # if validation loss has not decreased for the last 10 epochs, stop training
        if early_stop:
            if e - np.argmin(train_history['val_loss']) > 10:
                # display.clear_output()
                print('Validation loss does not decrease further, stopping training.')
                break

def main():
    # use GPU if available, else use CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("device:", device)

    n_gpus = torch.cuda.device_count()
    if n_gpus > 1:
      print("Let's use", n_gpus, "GPUs!")

    # load data
    chars, data = load_data('./data/autocompletion_papers.txt')

    # create RNN
    net = CharRNN(chars, n_hidden=256, n_layers=3)

    world_size = n_gpus * 1
    mp.spawn(train, nprocs=n_gpus, 
            args=(n_gpus, net, data, 500, 768, 64, 0.001, 5, 0.1, device, './data/training-autocompletion_papers', True, False))

if __name__ == '__main__':
    main()

# train
# rank, world_size, net, data, 500, 768, 256, 0.001, 5, 0.1, device, './data/training-autocompletion_papers', True, False
# train(net, data, epochs=500, n_seqs=768, n_steps=256, lr=0.001, device=device, val_frac=0.1, name='./data/training-autocompletion_papers', plot=False)