timeseriesAI / tsai

Time series Timeseries Deep Learning Machine Learning Python Pytorch fastai | State-of-the-art Deep Learning library for Time Series and Sequences in Pytorch / fastai
https://timeseriesai.github.io/tsai/
Apache License 2.0
5.19k stars 649 forks source link

Training on Larger than Memory Datasets #72

Closed xanderdunn closed 3 years ago

xanderdunn commented 3 years ago

Thanks for providing these implementations, they're very high quality. I've successfully reproduced most of the state of the art results in the self-supervised BERT-like time series transformer paper. All of the datasets used in the paper fit nicely into memory, but many real-world production applications will require iterating over datasets that are too large to fit into memory.

Dataset classes that support larger than memory datasets were proposed as a goal for FastAI 2.3 here. Linked in that discussion is NVTabular's Dataset class, an example of a dataloader that supports larger-than-memory datasets.

Supporting larger-than-memory datasets is probably outside the scope of this project, but I wonder if you could provide any pointers on how one might try to implement this in the existing TSDatasets and TSDataloaders/NumpyDataLoaders classes? I find they are pretty complex, and I'm not familiar with the fastai DataLoaders and Datasets classes that they're subclassing.

The NVTabular Dataset class randomly iterates over the partitions of a dask data frame, which supports multiple files on disk and larger-than-memory datasets. My understanding is that this could be challenging with fastai's training because the __getitem__ call requires random seeking of data, and dask does not support random seeks because it could require reading a different file on disk for each element. I wonder if the __getitem__ could just be tricked in some way to randomly iterate over a partition before moving on to the next random partition. NVTabular is using TorchAsyncIter to iterate over the lazily read dask dataframe.

vrodriguezf commented 3 years ago

Hi,

What about using numpy memmaps?. There's a nice tutorial about it in the library: https://github.com/timeseriesAI/tsai/blob/master/tutorial_nbs/00_How_to_efficiently_work_with_very_large_numpy_arrays.ipynb

xanderdunn commented 3 years ago

Thanks @vrodriguezf! This certainly looks like the right direction. Does memmap support multiple files? I'm not finding an example of it. My datasets, and the datasets NVTabular is designed to handle, are measured in terabytes, so these are broken into many different files. I have about 1000 Avro files for a single dataset. Attempting to create or download the entire dataset as a single file would be difficult. I'm not actually aware of a streaming file writing api in numpy that could turn these 1000 Avro files into a single .npy without consuming too much memory.

xanderdunn commented 3 years ago

Ok it looks like the linked example does actually provide code to stream data to a .npy file with low memory usage:

    path = Path('./data')
    X = create_empty_array((300_000, 50, 512), fname='X_on_disk', path=path, mode='r+')

    chunksize = 10_000
    pbar = progress_bar(range(math.ceil(len(X) / chunksize)))
    start = 0
    for _ in pbar:
        end = start + chunksize
        X[start:end] = np.random.rand(chunksize, X.shape[-2], X.shape[-1]) # type: ignore
        start = end

On a machine with 22.5GB of RAM this successfully creates a 29GB .npy file on disk with the assigned data with maximum resident set of 21.4GB. I could use this to stream multiple .avro files into a single .npy without overflowing memory. This in combination with windowing the time series data on the fly rather than upfront might achieve what I need.

Note that when reading the file mmap_mode='c' gave me the error OSError: [Errno 12] Cannot allocate memory, but mmap_mode='r' worked.

oguiza commented 3 years ago

Hi @xanderdunn, Thanks for your comments. I'm glad you've been able to reproduce most of the TST paper results. Out of curiosity, have you used tsai's TST or TST + TSBERT? I normally use np memmap arrays with datasets of 10-20GB, but I'd like to really understand better how they perform with much larger datasets like the ones you use. I'm very interested to learn whether you've able to use tsai with such big datasets (and it so how), or if you have encountered issues to do it. In that case, I'd be open to discuss potential solutions. Thanks in advance.

xanderdunn commented 3 years ago

Thanks @oguiza. I've been using TSBERT + TST. The self-supervised learning with TSBERT is important to my work. The representations are used in various downstream models. Currently I'm working on windowing the time series data on the fly rather than storing all of the fully formed, windowed data upfront. So, if I have 4 variables and 1,000,000 data points, I simply store a (1000000, 4) array to file. If my window size is 1024, this would balloon to an array size of (998976, 4, 1024). Instead of storing the windowed array to disk or memory, I read the un-windowed data and turn it into windowed samples as needed.

oguiza commented 3 years ago

Thanks for sharing this @xanderdunn. I'm glad to hear TST and TSBERT work well for you. What you mention about creating time series on the fly is really interesting. Your approach makes a lot of sense (in general, but especially for large datasets). I'd really like to better understand your approach to see if I can implement that in tsai to make it more useful. I think it might be possible to do the following:

Is this similar to what you are doing?

xanderdunn commented 3 years ago

@oguiza This is exactly what I have in mind, yeah! I haven't actually implemented it yet. Work has me focusing on other things at the moment, but I will be back on this soon. If you are able to implement this, I will absolutely test it and make use of it.

oguiza commented 3 years ago

Ok. I'm pretty busy at the moment, but I'll try to find some time to assess its feasibility be since it's a great idea. I'll add the assessment to my to-do list. But I can't commit to delivering this soon. Thanks a lot for sharing this @xanderdunn and for volunteering to test it. I'll let you know the outcome of the assessment once I finish it.

oguiza commented 3 years ago

Hi @xanderdunn, I've performed a quick test today to see what would be the impact of applying on-the-fly slicing to numpy arrays, and there's obviously some impact, but it's not as bad as I thought. You may want to check it with a larger dataset though to see whether it'd be acceptable or not. Here's the gist I've used.

xanderdunn commented 3 years ago

This looks great, thank you very much @oguiza! It looks like this iteration takes about 4x the amount of time it takes to iterate an array that's already windowed. I think in terms of the amount of disk space, memory space, and time saved by no longer needing to window the data prior to training, this is a big gain in productivity.

I think it would also be possible to actually require no additional time during training. The next batch could potentially be sliced in the background while the current batch is being trained? Might not be worth introducing that headache right now, but it should be possible. Tensorflow's tf.data does a lot of this prefetching, see here.

xanderdunn commented 3 years ago

I'm working on this again and currently looking at integrating your on-the-fly windowing gist into model training.

I believe this will require a custom data loader? Also I think splits will be indices of window_idxs rather than indices of X? Currently I have:

# Your function
def get_window_indices(o, window_size, start=0, stride=1, seq_first=True):
    """
    Given a numpy array o of time series data that has not yet been windowed,
    return the window indices.
    seq_first should be True if the given o has dimensions (num samples, num variables)
    """
    if o.ndim == 1: 
        o = np.expand_dims(o, 1)
    if seq_first: 
        seq_len = o.shape[0]
    else: 
        seq_len = o.shape[-1]
    max_time = seq_len - start - window_size + 1
    sub_windows = (start + np.expand_dims(np.arange(max_time, step=stride), 0).T + np.expand_dims(np.arange(window_size), 0))
    return sub_windows

def window_indices_and_splits(X, validation_percentage=0.2):
    print("Creating windows...")
    window_idxs = get_window_indices(X, 1024, start=0, stride=1, seq_first=True)
    print(window_idxs.shape)
    validation_percentage = 0.2
    max_train_index = int((1 - validation_percentage) * window_idxs.shape[0])
    splits = (list(np.arange(0, max_train_index)), list(np.arange(max_train_index, window_idxs.shape[0])))
    assert (len(splits[0]) + len(splits[1])) == len(window_idxs)
    assert splits[0][-1] == splits[1][0] - 1
    assert splits[1][-1] == window_idxs.shape[0] - 1
    print("{} samples in train".format(len(splits[0])))
    print("{} samples in validation".format(len(splits[1])))
    return window_idxs, splits

def run_my_dataset():
    X, y = my_data_to_numpy()
    window_idxs, splits = window_indices_and_splits(X, validation_percentage=0.2)
    # TODO: Create dataloader

I believe the next step is to make a data loader subclass that serves batches using the window_idxs.

xanderdunn commented 3 years ago

I think generating the window indices up-front defeats the memory-saving purpose of not windowing the data up-front. On a time series with 15190054 data points and a window size of 1024, get_window_indices will produce an array of size (15189031, 1024), giving the error: MemoryError: Unable to allocate 116. GiB for an array with shape (15189031, 1024) and data type int64. I can upgrade to more than 116GB of RAM, or I can change this to int32, but again this approach doesn't scale well in the size of the dataset. I believe the window indices need to be generated on the fly, not merely the windows themselves.

oguiza commented 3 years ago

Hi @xanderdunn, Yep. I agree it makes sense to create the indices on the fly. Let me share a couple of approaches:

A way to resolve this might be the following:

I believe this approach would be easier to maintain as you get more data in the future. My concern is how fast would this work. I’ll try to find some time to create to test this approach.

xanderdunn commented 3 years ago

Thanks a lot @oguiza, I think both of those approaches are in the right direction.

For the first one, couldn't the train/val overlap be solved by returning the length of the train dataset as (num train data points - window_size)? Then it shouldn't return items beyond the edge of the train set. Some methods propose a larger "embargo" between train and validation sets. I think for self-supervised methods this isn't paramount, but for prediction tasks it's important.

For the second approach, do you mean create a Dataset for each partition of data we have on disk? This would be nice because it removes the need to put all the data into a single monolithic .npy file. However, how do we do shuffled reads of the data without needing to fully read into memory many (or all) different files for each batch? It's possible for an unwindowed dataset to exceed memory as well.

oguiza commented 3 years ago

Today I found so time to work on this issue, and have created a method that allows you to train multiple, large datasets on disk. The proposed method applies window indices on the fly to one or multiple files. One of the keys to working with so larger than RAM datasets is to create one or multiple np.memmap arrays that can be quickly accessed. This proposed approach works well across multiple files simultaneously. It'd be good if you would take a look at this gist and provide feedback to decide the next steps.

xanderdunn commented 3 years ago

Amazing, huge thanks @oguiza! In parallel I was working on the first approach and I have it working. This requires putting the entire dataset into a single .npy file that is memmapped. The window indices are calculated on each __getitem__ call to the Datasets subclass:

from typing import Tuple

import numpy as np

from tsai.all import L, delegates, TSDatasets, TSTensor, TSDataLoaders

def vrange(starts, stops, length):
    """
    Create ranges of integers for multiple start/stop

    Parameters:
        starts (1-D array_like): starts for each range
        stops (1-D array_like): stops for each range (same shape as starts)

    Returns:
        numpy.ndarray: ranges
    """
    stops = np.asarray(stops)
    l = stops - starts # Lengths of each range.
    values_concatenated = np.repeat(stops - l.cumsum(), l) + np.arange(l.sum())
    return values_concatenated.reshape((starts.shape[0], length))

@delegates(TSDatasets.__init__)
class TSUnwindowedDatasets(TSDatasets):
    """
    A dataset that creates tuples from X (and y) and applies `item_tfms`.
    This assumes that X is given with dimensions (num samples, num variables).
    X is unwindowed time series data that needs to be windowed in samples of length window_size.
    """
    _xtype, _ytype = TSTensor, None # Expected X and y output types (torch.Tensor - default - or subclass)
    def __init__(self,
                 X=None,
                 y=None,
                 window_size=None,
                 items=None,
                 sel_vars=None,
                 sel_steps=None,
                 tfms=None,
                 tls=None,
                 n_inp=None,
                 dl_type=None,
                 inplace=True,
                 **kwargs):
        assert window_size is not None
        self.window_size = window_size
        print(f"window_size = {window_size}")
        # TODO: Introduce an embargo between train and validation sets
        super().__init__(X=X,
                         y=y,
                         items=items,
                         sel_vars=sel_vars,
                         sel_steps=sel_steps,
                         tfms=tfms,
                         tls=tls,
                         n_inp=n_inp,
                         dl_type=dl_type,
                         inplace=inplace,
                         **kwargs)

    def __getitem__(self, it) -> Tuple[TSTensor]:
        # Instead of storing the window_idxs, calculate them on-the-fly
        it = np.array(it)
        window_idxs = vrange(it, it + self.window_size, self.window_size)
        item = np.swapaxes(self.X[window_idxs], 1, 2)
        return_item = (TSTensor(item),)
        return return_item

    def subset(self, i):
        print(f"Making a subset with {len(self.splits[i])} indices...")
        new_subset = type(self)(tls=L(tl.subset(i) for tl in self.tls),
                                n_inp=self.n_inp,
                                inplace=self.inplace,
                                tfms=self.tfms,
                                sel_vars=self.sel_vars,
                                sel_steps=self.sel_steps,
                                split=L(self.splits[i]) if self.splits is not None else None,
                                window_size=self.window_size)
        print(f"self.X.shape prior to subsetting: {self.X.shape}")
        new_subset.X = self.X[self.splits[i], :]
        print(f"New subset has X.shape {new_subset.X.shape}")
        return new_subset

    def __len__(self):
        # Prevent calling for item indices that will produce window indices that are out of bounds
        return self.X.shape[0] - self.window_size

def get_ts_unwindowed_dls(X,
                          y=None,
                          splits=None,
                          window_size=None,
                          sel_vars=None,
                          sel_steps=None,
                          tfms=None,
                          inplace=True,
                          path='.',
                          bs=64,
                          batch_tfms=None,
                          num_workers=0,
                          device=None,
                          shuffle_train=True,
                          drop_last=True,
                          **kwargs):
    if splits is None: splits = (L(np.arange(len(X)).tolist()), L([]))
    print("Making datasets...")
    dsets = TSUnwindowedDatasets(X,
                                 y,
                                 splits=splits,
                                 window_size=window_size,
                                 sel_vars=sel_vars,
                                 sel_steps=sel_steps,
                                 tfms=tfms,
                                 inplace=inplace,
                                 **kwargs)
    print("Done making datasets.")
    # print(f"datasets len: {dsets.len}")
    # print(f"datasets vars: {dsets.vars}")
    dls = TSDataLoaders.from_dsets(dsets.train,
                                   dsets.valid,
                                   path=path,
                                   bs=bs,
                                   batch_tfms=batch_tfms,
                                   num_workers=num_workers,
                                   device=device,
                                   shuffle_train=shuffle_train,
                                   drop_last=drop_last,
                                   **kwargs)
    return dls

def make_splits(X: np.ndarray, window_size: int, validation_percentage=0.2) -> Tuple[List[int], List[int]]:
    """
    Given an unwindowed input array X with dimensions (num samples, num variables), return the train/val split indices
    and the window id's.
    """
    print("Making the train/val splits...")
    validation_percentage = 0.2
    num_windows = X.shape[0] - (window_size - 1)
    max_train_index = int((1 - validation_percentage) * num_windows)
    splits = (list(np.arange(0, max_train_index)), list(np.arange(max_train_index, num_windows)))
    assert (len(splits[0]) + len(splits[1])) == num_windows
    assert splits[0][-1] == splits[1][0] - 1
    assert splits[1][-1] == num_windows - 1
    print("{:,} samples in train".format(len(splits[0])))
    print("{:,} samples in validation".format(len(splits[1])))
    return splits

def run_my_dataset():
    dataset_id = "my_data"
    batch_transformers = [TSStandardize(by_sample=True)]
    tfms = [None, TSRegression()]
    device = torch.device("cuda:0") # pylint: disable=no-member
    architecture = TSTPlus
    learning_rate = 1e-4
    num_epochs = 200
    batch_size = 256
    window_size = 512 # FIXME: A window_size > 512 causes this crash: https://github.com/timeseriesAI/tsai/issues/86
    validation_percentage = 0.2

    X, y = get_my_data_monolithic_files()
    splits = make_splits(X, window_size=window_size, validation_percentage=validation_percentage)

    udls = get_ts_unwindowed_dls(X,
                                 splits=splits,
                                 window_size=window_size,
                                 tfms=tfms,
                                 batch_tfms=batch_transformers,
                                 device=device,
                                 bs=batch_size)

    learn = ts_learner(udls,
                       architecture,
                       cbs=[TSBERT(target_dir=f'./data/TSBERT/{dataset_id}', file_name=f'{dataset_id}', dropout=0.1),
                            SaveModelCallback(),
                            TerminateOnNaNCallback(),
                            EarlyStoppingCallback(monitor='valid_loss', min_delta=0.0001, patience=40)],
                       device=device)
    learn.fit_one_cycle(num_epochs, learning_rate)

Some issues:

I think my implementation might be sloppier than yours. I just reverse engineered it from TSDatasets. I will try your implementation now and compare.

xanderdunn commented 3 years ago

I'm trying out your code now, I'm running this:

    dataset_id = "my_data"
    batch_transformers = [TSStandardize(by_sample=True)]
    device = torch.device("cuda:0") # pylint: disable=no-member
    architecture = TSTPlus
    learning_rate = 1e-4
    num_epochs = 200
    batch_size = 200
    window_size = 512 # FIXME: A sample length / window_size > 512 causes this crash:
    #    https://github.com/timeseriesAI/tsai/issues/86
    validation_percentage = 0.2

    X, y = my_data()
    datasets = [WindowDataset(X, y, y_func=None, window_size=window_size, stride=1, seq_first=True)]
    metadataset = MetaDataset(*datasets)
    print(metadataset.vars, metadataset.c, metadataset.len, metadataset[0], len(metadataset))
    splits = TSSplitter(valid_size=validation_percentage, show_plot=False)(metadataset)
    metadatasets = MetaDatasets(metadataset, splits)

    dataloaders = TSDataLoaders.from_dsets(metadatasets.train,
                                           metadatasets.valid,
                                           path='.',
                                           bs=batch_size,
                                           batch_tfms=batch_transformers,
                                           num_workers=0,
                                           device=device,
                                           shuffle_train=True,
                                           drop_last=True)

    learn = ts_learner(dataloaders,
                       architecture,
                       cbs=[TSBERT(target_dir=f'./data/TSBERT/{dataset_id}', file_name=f'{dataset_id}', dropout=0.1)],
                            # WandbCallback(),
                            # SaveModelCallback(),
                            # TerminateOnNaNCallback(),
                            # EarlyStoppingCallback(monitor='valid_loss', min_delta=0.0001, patience=40)],
                       device=device,
                       loss_func=MSELossFlat())
    learn.fit_one_cycle(num_epochs, learning_rate)

A few rough spots:

xanderdunn commented 3 years ago

Ok the last issue I mentioned above disappears when I update to latest master tsai. I'm successfully running your code on larger than memory datasets, both random data and real datasets. I like that your implementation supports both single monolithic files and the initial dataset creation time is also much faster in your implementation. I made some minor changes in this gist to support single files. 👍

xanderdunn commented 3 years ago

Is it possible that the number of train batches is being calculated incorrectly? It looks like it's training for the number of validation batches rather than the number of train batches. I'm also seeing this occur in your gist. It trains for 11717 batches, which is 512 * 11717 = 5,999,104 samples, just under the 5,999,386 samples in the validation set. The train set actually has 23,997,545 samples.

xanderdunn commented 3 years ago

This is happening because the learner calls metadatasets.train, and then calls metadatasets.valid prior to the first epoch. These properties call self.subset, which mutates the self.metadaset.split_idxs. The MetaDataset __lel__ is a function of self.split_idxs. Because the valid property is called second, the metadataset.split_idxs is set to the valid values, so __len__ returns the valid length rather than the train length. Here is a hacky workaround:

from copy import copy

    def subset(self, i):
        new_set = copy(self.metadaset)
        new_set.split_idxs = self.splits[i]
        return new_set

These are shallow copies, so it's not actually using up much more RAM. 4.79GB with the copies vs. 4.44GB without the copies. This fix will have a large impact on the performance numbers in your Colab run.

xanderdunn commented 3 years ago

I'm seeing a large performance difference in runs.

The monolithic .npy implementation I posted above using tsai at commit 75f81e4ab1bda5c6618028b0e96a59e2a9dedd6b does the first epoch on a 13M sample dataset in 5 hours. GPU is consistently 77-89%. The Metadataset implementation on latest master does the first epoch of the same model and dataset in 7 hours. The GPU isn't utilized well. It flips back and forth from 0 to 70%.

I'm currently profiling to see if it's the changes introduced here or if it's the differences in the data loaders.

Your faster dis changes on master branch have a serious impact on this. The same run goes from 7 to 4hr20min on the first epoch because of it! I just had to remove this print statement. However, I still see the GPU usage going to 0. I also just realized that his project has both master and main branches. I believe pip install -U git+https://github.com/timeseriesAI/tsai.git installs the default branch, which is main, so my previous comments on "tsai latest" were referring to main, not master.

xanderdunn commented 3 years ago

I made a change here that loads the masks in background queues. It makes the assumption that the only dimension of the data that ever changes is the batch size, which should be a safe assumption to make. Introducing this change takes my dataset's first epoch train time from 4hr20min to 3hr30min! GPU utilization drops down periodically but is typically above 90%.

I think there's still a performance improvement to be had somewhere. The batch doesn't iterate smoothly, it still has a go and stop behavior where it does ~5 iterations smoothly and then hangs for a moment. I'll try to investigate this. An illustration of the burst and pause behavior: Screen Recording 2021-03-28 at 13 02 49 You can see the GPU goes 95% -> 14% -> 97% -> 0% -> 96% -> 23% -> 95% -> 3% -> 96% -> 16% ...

Mask background prefetch combined with your data loader shuffle improvement make this the fastest run I've seen yet, so I would propose incorporating your MetaDataset into main.

oguiza commented 3 years ago

Hi @xanderdunn, Wow, you've done a lot of good work during the last few days. Great progress!! From my side I've upgraded the code I provided to fix most (all?) of the issues you raised (see gist). I've profiled it as well and it works pretty well. I have not seen any GPU utilization below 75% (I'm not using MVP). To me this would be the code I'd like to add to tsai. It'd be good if you can verify it works as expected. The changes I made to the dataloaders are now available in the github 'main' branch. As to the changes you've made to the MVP code I will test them and will get back to you with feedback. It's great that mask creation can be done in the background. I'd like to learn more about Thread and Queue. Do you know any tutorial that can help me get up to speed on it? My impression is that there may be other areas in tsai that could also be accelerated with this approach. I think we are pretty close to having a pretty fast implementation of both a single and multiple, unwindowed datasets!! Thanks for all the work you are doing with this!

xanderdunn commented 3 years ago

Two bug fixes:

- [ ] I'm not sure what's different in our setups, but I had to change `TSMetaUnwindowedDatasets` `subset`:
```python
#        return type(self.metadaset)(self.metadaset.datasets, split=splits[I]) # Crashes that splits not found
        return type(self.metadaset)(self.metadaset.datasets, split=self.splits[i])

v2 code seems to be equally as performant as the v1 code, first train epoch is still ~3hr30min on my real dataset. With the above two fixes, I think this is ready to be included in main.

Note that TSMetaUnwindowedDataset won't scale well in the number of datasets due to the Python for loops and list comprehensions, but I don't think that needs to be solved right now. This shouldn't be noticeable until tens of thousands of datasets or more, which is not currently an issue for me.

xanderdunn commented 3 years ago

Let me know how the mask background fetching works for you. It provided a big performance boost for my MVP training. Yesterday I applied some bug fixes to it, so when you try it, take a look at the latest code here.

Unfortunately no resources come to mind for learning more about Python threads, I haven't read about them in a while. I'm interested in applying background prefetching to the batches as well. Various projects have done this in pytorch. NVIDIA apex. The pytorch vision library also does batch prefetching.

oguiza commented 3 years ago

I've fixed the 2 issues you've reported, and have cleaned up code a bit. In addition, I'm now saving the mapping_idxs for every batch, so that it's easy to connect any sample in a batch to a dataset and sample id. That's a functionality I always include in tsai. I've also renamed TSMetaUnwindowedDataset/s to TSMetadataset/s, as they are actually useful with any type of dataset, not just the TSUnwindowedDatasets. Here's a gist with the latest changes. I'm planning to release the code in tsai later today or tomorrow. As to your comment about list comprehension, it's true. I've checked it though with 20k smaller datasets, and there's some impact but it's not huge. I believe this is because even if you have thousands of datasets, your batch size is unlikely to be that big, so in each batch you only fetch N=batch size samples. Thanks for the info on Python threads. It'd be good if you would confirm that you can train well with MetaDatasets once I release the code. I'll keep you posted.

oguiza commented 3 years ago

I've just released the code. The class names are TSUnwindowedDataset, TSUnwindowedDatasets, TSMetaDataset, and TSMetaDatasets. They are all in the tsa.data module.

xanderdunn commented 3 years ago

I'm using the latest code on main 272ae62aad02a6554e7f7663a3baca104d2ba8d9 and it's working as expected. Performance looks to be the same.

I've got one more bug fix. The previous iteration of your code included a property vars on TSMetaUnwindowedDataset, but it looks like TSMetaDataset no longer has that. When I do distributed training, this causes a crash. I'm not sure why the property is accessed only in distributed training, but adding this to TSMetaDataset is an easy fix:

class TSMetaDataset():
...
    @property
    def vars(self):
        return self.datasets[0].vars # This is the same for both the single and multiple datasets situation because in both cases it's a list
oguiza commented 3 years ago

Ok, great! Thanks for testing it. I'll this change tomorrow. Could you please confirm what type of test you've done? I understand you are using it with large datasets (OOM), multiple memmap arrays, distributed training, and using MVP. Is this correct? Have you tried this type of setting before? Is it very slow? Does all this improve your performance (accuracy or whatever metrics you use)? Sorry for asking so many questions. I don't need specific details. I'm just trying to get a sense of the type of setting where this functionality works.

xanderdunn commented 3 years ago

I've been testing this on a real dataset with about 13million unwindowed samples. It's 230MB on disk as .npy. Windowing with stride=1 and window_size=512-1024. This is a subset of what's to come. I'm training MVP with subsequence masks using TSTPlus model. I've trained time series datasets this large using supervised WaveNet previously, but this is the first time I'm training datasets this large using Transformers. Yes, I am using pytorch's distributed training classes DistributedDataParallel and DistributedDL to split the batches across multiple GPUs simultaneously. I have yet to determine that the self-supervised pre-training using MVP produces a better outcome in my final tasks. That's my next step.

With two A100s I can see my 13million sample dataset converging in about two days. I should be able to decrease this with 4-8 GPUs. I believe there is still a serious bottleneck somewhere on the CPU side of things because I'm seeing GPU usage jump from 90+% to 0% at regular intervals, as a described above. This issue is even more pronounced on the validation epoch because the forward-only pass makes the GPU even less of a bottleneck there.

Our recent performance improvements have helped a lot with performance:

Thanks again for your interest and your work on this. I'll keep you updated with my results.

oguiza commented 3 years ago

Thanks a lot for the information Xander. I like to understand how tsai is being used, and identify opportunities to further improve it, like this request. I believe the type of setting you describe will become more common as self-supervised training gains momentum. Larger datasets will be used, and parallel training. That’s why I’m very interested in learning more about your experience. I’ve also used MVP, although in smaller settings so far. A couple of things I’ve learned are:

Please, let me know what type of results you get.

Thanks for your help developing all this.

BTW, I still need to test you MVP prefetch code, and will investigate the drop in GPU utilization. Have you been able to confirm if it also occur whe you are not using MVP. (My gut feel is that it’s related to MVP, not the other new code we’ve developed).

xanderdunn commented 3 years ago

Huge thanks for the tips on using InceptionTimePlus and r>0.15. I'll try both and let you know how it goes.

I'll try training a non-MVP model and see what the GPU utilization looks like.

xanderdunn commented 3 years ago

@oguiza I can confirm the GPU under-utilization is specific to MVP. Using the same code and data I just changed the task to supervised TSTPlus training and I'm seeing my A100 at consistent 96-99% usage.

xanderdunn commented 3 years ago

I found one additional minor bug with MetaDataset. When I include the CutMix1D callback in the supervised learner, I get this error:

Traceback (most recent call last):
  File "./pymydata/tsbert_my_data.py", line 249, in <module>
    run_my_dataset(distributed=distributed, resume=False,)
  File "./pymydata/tsbert_my_data.py", line 240, in run_my_dataset
    resume_this_run=False)
  File "./pymydata/tsbert_my_data.py", line 87, in train_supervised
    learn.fit_one_cycle(n_epochs, learning_rate)
  File "/home/xander/dev/fastai/fastai/callback/schedule.py", line 112, in fit_one_cycle
    self.fit(n_epoch, cbs=ParamScheduler(scheds)+L(cbs), reset_opt=reset_opt, wd=wd)
  File "/home/xander/dev/fastai/fastai/learner.py", line 211, in fit
    self._with_events(self._do_fit, 'fit', CancelFitException, self._end_cleanup)
  File "/home/xander/dev/fastai/fastai/learner.py", line 160, in _with_events
    try: self(f'before_{event_type}');  f()
  File "/home/xander/dev/fastai/fastai/learner.py", line 202, in _do_fit
    self._with_events(self._do_epoch, 'epoch', CancelEpochException)
  File "/home/xander/dev/fastai/fastai/learner.py", line 160, in _with_events
    try: self(f'before_{event_type}');  f()
  File "/home/xander/dev/fastai/fastai/learner.py", line 196, in _do_epoch
    self._do_epoch_train()
  File "/home/xander/dev/fastai/fastai/learner.py", line 188, in _do_epoch_train
    self._with_events(self.all_batches, 'train', CancelTrainException)
  File "/home/xander/dev/fastai/fastai/learner.py", line 160, in _with_events
    try: self(f'before_{event_type}');  f()
  File "/home/xander/dev/fastai/fastai/learner.py", line 141, in __call__
    def __call__(self, event_name): L(event_name).map(self._call_one)
  File "/home/xander/anaconda3/envs/my_model/lib/python3.7/site-packages/fastcore/foundation.py", line 154, in map
    def map(self, f, *args, gen=False, **kwargs): return self._new(map_ex(self, f, *args, gen=gen, **kwargs))
  File "/home/xander/anaconda3/envs/my_model/lib/python3.7/site-packages/fastcore/basics.py", line 666, in map_ex
    return list(res)
  File "/home/xander/anaconda3/envs/my_model/lib/python3.7/site-packages/fastcore/basics.py", line 651, in __call__
    return self.func(*fargs, **kwargs)
  File "/home/xander/dev/fastai/fastai/learner.py", line 145, in _call_one
    for cb in self.cbs.sorted('order'): cb(event_name)
  File "/home/xander/dev/fastai/fastai/callback/core.py", line 44, in __call__
    if self.run and _run: res = getattr(self, event_name, noop)()
  File "/home/xander/dev/tsai/tsai/data/mixed_augmentation.py", line 30, in before_train
    self.labeled = True if len(self.dls.tls) > 1 else False
  File "/home/xander/anaconda3/envs/my_model/lib/python3.7/site-packages/fastcore/basics.py", line 388, in __getattr__
    if attr is not None: return getattr(attr,k)
  File "/home/xander/anaconda3/envs/my_model/lib/python3.7/site-packages/fastcore/basics.py", line 388, in __getattr__
    if attr is not None: return getattr(attr,k)
AttributeError: 'TSMetaDataset' object has no attribute 'tls'

I haven't yet had the time to investigate it.

oguiza commented 3 years ago

Hi @xanderdunn, I've identified the root cause of the issue. All fastai datasets have an attribute tls that is used across different areas. In the case of TSUnwindowedDataset we don't have it (as it could be very big and take a lot of memory). That's why there may be some 'unintended' consequences of using the new classes. The issue with CutMix1d and Mixup1d is an easy one I'll fix today (these is part of the tsai code). But there might be other issues related to fastai (that are not under my control, and we might need to patch). I've reviewed were in tsai this attribute is used, and it's pretty limited. Kind of the same thing in fastai. But wanted to give you a heads up that we find some of these issues when you use new functionality with a TSUnwindowedDataset.

xanderdunn commented 3 years ago

Thanks @oguiza that will be very helpful. What is the tls property expected to return? I looked for it in the fastai docs but couldn't find it.

oguiza commented 3 years ago

tls are transformed lists. These are lists of items where transforms (lazy or non-lazy) have been applied. This could for example be lists of img paths, or in the case or TSDatasets in tsai X and y. In the case of TSUnwindowedDataset I don't see what could be the tls, since nothing is calculated (not even the indices that will be applied, that are calculated on the fly).

oguiza commented 3 years ago

I've looked at the low gpu utilization. I think the main reason is that masks are created in the cpu. I've tried to find a way to make it faster in the gpu but unfortunately haven't been able to find a faster way. I've also tested your method to create masks in the background, but have not seen any improvement. I've tested it with different max_prefetch values, but it didn't improve. So I'd rather leave MVP as it is for now. The main area of improvement is the creation of masks in the gpu. I'm sure there must be a way but haven't found it (yet!).

xanderdunn commented 3 years ago

Thanks for taking a look at the GPU utilization. What was your performance testing situation? I don't find that the prefetching is beneficial on small datasets, but I do find benefit on my 13M sample dataset windowed at stride=1. Using an A100 GPU. Batch size 216. window_size 512. 4 variables. With BackgroundGenerator:

Epoch 1/50 : |-----------------------------------------------------------------| 1.13% [637/56257 02:26<3:32:30 0.9182]

Without BackgroundGenerator:

Epoch 1/50 : |-----------------------------------------------------------------| 1.13% [638/56257 02:50<4:08:13 0.9172]

I find that it shaves ~40min off of a 4hr10min epoch. I agree though this doesn't look like the optimal solution to the mask performance problem.

I wonder if this will allow creating the torch.distributions directly on GPU.

xanderdunn commented 3 years ago

This seems to be very performant:

import torch
from torch import distributions

use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
Tensor = FloatTensor

def create_subsequence_mask_torch(o, r=.15):
    if o.ndim == 2: o = o[None]
    n_masks, mask_dims, mask_len = o.shape
    probs = Tensor([1 - r]).repeat((n_masks, mask_dims, mask_len))
    distribution = distributions.binomial.Binomial(1, probs)
    values = distribution.sample()
    return values

Both the probs and the values are created directly on the gpu.

With NO BackgroundGenerator:

Epoch 1/50 : |-----------------------------------------------------------------| 1.13% [638/56257 01:52<2:42:52 0.9215]

I don't see any start and stop behavior, and I see GPU usage at a constant 93-97%. Granted, I haven't implemented the additional functionality of create_subsequence_mask, which includes lm, stateful, and sync. I find that my BackgroundGenerator offers no benefit when paired with the above GPU method of creating the masks.

oguiza commented 3 years ago

Yes, that's the method you get when using MVP with stateful=False. Interestingly I found that it's faster to create the data with numpy and then create the tensor than using the method you described. I used this:

from torch import distributions
r = .15
o = torch.rand(16, 3, 100).cuda()
n_masks, dims, mask_len = o.shape
mask1 = distributions.binomial.Binomial(1, tensor([1 - r], device=default_device()).repeat((*o.shape))).sample()
mask2 = o.new(np.random.binomial(1, 1 - r, (n_masks, dims, mask_len)))
mask1.shape==mask2.shape, mask1.device==mask2.device, mask1.mean(), mask2.mean()
%timeit distributions.binomial.Binomial(1, tensor([1 - r], device=default_device()).repeat((*o.shape))).sample()
%timeit o.new(np.random.binomial(1, 1 - r, (n_masks, dims, mask_len)))

In Colab the 2nd method is 2x faster!

The authors of the A TRANSFORMER-BASED FRAMEWORK FOR MULTI- VARIATE TIME SERIES REPRESENTATION LEARNING paper claim that the stateful (the one currently implemented in tsai by default performs better. If you want you can try it by setting MVP stateful to False and test it with your own data.

xanderdunn commented 3 years ago

Interesting. Our performance results are exactly opposite. My epoch takes 4hr10min using np.random.binomial and it takes 2hr40min with torch.distribution.binormial.Binomial. I think one thing a %timeit test doesn't measure is the time required to move data CPU<>GPU. I think it also doesn't measure the influence of pytorch's computation graph, which may lose performance to the CPU<>GPU transition on every batch. Can you measure the performance of these two approaches on model training and see what the results are?

I see in the paper where they show that the stateful masks performed better, thanks for mentioning it:

Screen Shot 2021-04-04 at 12 37 30
xanderdunn commented 3 years ago

I realized my above performance comparison was comparing apples to oranges. tsai default run uses stateful=True and my pytorch implementation was stateful=False.

Here is my performance test using the default numpy implementation with stateful=False:

Epoch 1/200 : |-----------------------------------------------------------------| 1.14% [639/56257 02:02<2:57:38 0.9119]

I see my GPU at 87-93%. This is 2hr57min (numpy) vs. 2hr43min (pytorch). Very close. It seems the overhead of CPU<>GPU on the numpy implementation evens out with the slower pytorch implementation.

So the stateful=True implementation is considerably less performant. I will try to implement the stateful approach using pytorch distributions and see how it compares.

oguiza commented 3 years ago

Ok. I'll try to test both approaches later today and will let you know.

Yeah, it'd be great if you would create a similar approach for the stateful mode. I haven't been able to find anything that is faster than the current implementation.

oguiza commented 3 years ago

Hi @xanderdunn, In the end, I managed to do it 😅! I've found a way to create masks in cuda using torch distributions and torch repeat_interleave. So now both stateful=False and True are created in cuda. I've also tested the np vs torch creation and torch is faster as you pointed out. I've updated the GitHub repo now with the new code.

Edit: I've been using the code with one of my datasets (100k samples) and it's 2.5x faster than the previous one.

xanderdunn commented 3 years ago

This is huge! Thanks so much for figuring this out. I was staring at repeat_interleave while falling asleep last night but hadn't yet figured it out 😆

I can confirm the performance. With my usual model and dataset, with stateful=True, I see performance:

Epoch 1/200 : |-----------------------------------------------------------------| 1.13% [637/56257 01:51<2:42:33 0.9406]

That's 2hr42min (pytorch) vs. 4hr8m (numpy). Big improvement!

I tried improving performance even more by decorating the create_subsequence_mask function:

@torch.jit.script # type: ignore
@torch.no_grad()

as shown here. However, it appears that the distributions are not a part of TorchScript:

torch.jit.frontend.FrontendError: Cannot instantiate class 'Geometric' in a script function:
  File "/home/xander/anaconda3/envs/my_model/lib/python3.7/site-packages/torch/autograd/grad_mode.py", line 43
        max_len = max(1, 2 * torch.ceil(numels // (1/pm + 1/pu)))
        for i in range(10):
            _dist_a = (torch.distributions.geometric.Geometric(probs=proba_a).sample([max_len])+1).long()
                       ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ <--- HERE
            _dist_b = (torch.distributions.geometric.Geometric(probs=proba_b).sample([max_len])+1).long()
            dist_a = _dist_a if i == 0 else torch.cat((dist_a, _dist_a), dim=-1)

I think I'm satisfied with the MVP implementation now, and performance on my large datasets is sufficient.

oguiza commented 3 years ago

I'm glad you also had a significant improvement with your dataset and model 😃 I understand having a loop is not ideal, but it usually only iterates once. So I'm not too concerned about it. I added it because there are occasions where the random distribution may not create a long enough tensor. And if you want to be sure the tensor is always long enough, you'd need to always create a very long one, which would greatly increase memory. One potential solution to resolve this would be to check if it's long enough, and if it isn't (very unlikely) copy a section of it until it matches the required length. Anyway, additional iterations are very unlikely with a large batch. It's more likely with small batches, but then performance is not so much of a concern. So unless we have any other brilliant idea, I think the current design is good enough. I've started to use it and want to get a better feeling for the impact of some of its parameters.

oguiza commented 3 years ago

I'll close this issue now as there's a solution to use larger than memory datasets in tsai (in a single file or multiple files). Please, feel free to reopen if necessary.