ContinualAI / avalanche

Avalanche: an End-to-End Library for Continual Learning based on PyTorch.
http://avalanche.continualai.org
MIT License
1.79k stars 291 forks source link

Possible Replay bug #543

Closed Mattdl closed 3 years ago

Mattdl commented 3 years ago

Hi all,

I'm working on implementing Class-Balanced and Experience-balanced Replay (See #479). I want to provide the opportunity for the user to select a fixed capacity prior or adaptive to all seen experiences so far.

However, when looking at the Replay code I find it hard to follow:

single_task_mem_size = min(self.mem_size, len(curr_data))
h = single_task_mem_size // (strategy.training_exp_counter + 1)

remaining_example = single_task_mem_size % (
    strategy.training_exp_counter + 1)
# We recover it using the random_split method and getting rid of the
# second split.
rm_add, _ = random_split(
    curr_data, [h, len(curr_data) - h]
)

In the first line: https://github.com/ContinualAI/avalanche/blob/9cf3c53d83ffc1b3dfe4400d43356ebcd901cfc9/avalanche/training/plugins/replay.py#L64 What if len(curr_data) is smaller than self.mem_size? It would occur to me that 'h' is not correct anymore. Even though the current data length len(curr_data) may not equal the full memory size, it can still have capacity for a mem_size/n_observed_exps portion of the memory. So h should become self.mem_size // (strategy.training_exp_counter + 1)?

When taking the random_split however, then we should take the 'min' operation into account:

 h2 = min(capacity_per_exp, len(curr_data))
  rm_add, _ = random_split(
      curr_data, [h2, len(curr_data) - h2]
  )

Am I missing something here or is this a bug?

AntonioCarta commented 3 years ago

If I understand correctly the bug happens only whenever len(curr_data) < mem_size and len(curr_data) > (mem_size / (strategy.training_exp_counter + 1)). Definitely a bug, we should fix it and add a test to cover this corner case.

vlomonaco commented 3 years ago

Thanks @Mattdl for reporting, if you have already a general fix in place, would you be able to tackle this opening a PR? thanks :)

Mattdl commented 3 years ago

Yes sure, I'll add the fix as soon as possible! (: I'm also implementing the ExperienceBalancedStoragePolicy and ClassBalancedStoragePolicy. Should I incorporate those at once?

AntonioCarta commented 3 years ago

A single PR is ok.

Mattdl commented 3 years ago

I'm finishing up on the PR, but one issue remains if you want to update the replay memory in each minibatch. Retrieval in replay is now defined using the MultiTaskJoinedBatchDataLoader in before_training_exp to have balanced replay. So when the memory changes during the learning of an experience, these are not yet available for retrieval.

I tried the following in the before forward:

    mem_dataloader = MultiTaskMultiBatchDataLoader(
        AvalancheConcatDataset(self.replay_mem.values()),
        oversample_small_tasks=False,
        num_workers=num_workers,
        batch_size=strategy.train_mb_size,  # A batch of replay samples
        shuffle=shuffle
    )

    mb_x, mb_y, mb_task_id = next(iter(mem_dataloader))[0]
    mb_x, mb_y = mb_x.to(strategy.device), mb_y.to(strategy.device)

    # Add to current batch
    strategy.mb_x = torch.cat((strategy.mb_x, mb_x))
    strategy.mb_y = torch.cat((strategy.mb_y, mb_y))

This works for a while, but eventually gives a RecursionError: maximum recursion depth exceeded in comparison error. As in https://github.com/pytorch/pytorch/issues/1917 dataloaders shouldn't be short-lived objects, probably causing this. Any thoughts on how we can make this efficient for frequent (batch-wise) updates? Keep in mind that the replay memory is a dictionary with values being AvalancheDatasets.

vlomonaco commented 3 years ago

Maybe you can try replace the dataset inside the data loader object "manually" and reset its state instead of creating a new data loader? Just guessing. Maybe @AntonioCarta and @lrzpellegrini have better insights.

AntonioCarta commented 3 years ago

Why do you need to update the buffer during training? I don't think this is needed if your benchmark is set up correctly. Basically, whenever you receive new data (a new experience), you update the replay buffer and you're done.

If I understand what you want to do, you are assuming a scenario where data (an experience) arrives in mini-batches. I think the best way to model this setting is to split your stream accordingly before calling train. If you do this, your strategy becomes easier to implement, because you can follow the same approach of the current replay plugin (update buffer -> set dataloader -> train on experience).

Regarding the error, I may be wrong but it could be related to the implementation of AvalancheConcatDataset, it doesn't seem a problem with the dataloader.

Mattdl commented 3 years ago

@AntonioCarta I'm trying to make a solution that's also applicable for online data incremental learning, therefore we need to update the buffer during training as the data comes in. We already had some discussion on this in #360, but it seems to me a first solution with no drastic changes to the codebase is:

The next step would be to have some kind of meta-experiences (e.g. tasks/domains) to define the scenarios, and then the experiences (batches) for processing. I see it as something like SplitMNIST(n_experiences=5, batchwise_exps=True ) which would create the classic SplitMNIST, but the experiences (as they are implemented now) would increment per batch. But this requires fundamental changes in the creation of the scenarios, so I'd leave that to the Avalanche experts in there (:

I've implemented a first example with CoPE, but I first need to figure out how to fix this issue. From there on it can be expanded to other online methods. @vlomonaco thanks that might work, I'll give it a try! @AntonioCarta thanks, it indeed traces back to the next() on the AvalancheConcatDataset. If you have any idea what causes this behaviour, let me know (:

AntonioCarta commented 3 years ago

@AntonioCarta I'm trying to make a solution that's also applicable for online data incremental learning, therefore we need to update the buffer during training as the data comes in.

The problem I see with your current implementation is that the end-user is not aware that the scenario is data incremental. In Avalanche we separate data streams and strategies, therefore it make sense to me to prepare the mini-batches for a data incremental scenario outside the strategy.

Think about this problem. How do you compare other strategies with CoPE on the same scenario? With your solution it's much more difficult to understand if each strategy is training on the same experiences, because CoPE de facto splits the experience into multiple smaller experiences. I need to be aware of its internal implementation.

The next step would be to have some kind of meta-experiences (e.g. tasks/domains) to define the scenarios, and then the experiences (batches) for processing. I see it as something like SplitMNIST(n_experiences=5, batchwise_exps=True ) which would create the classic SplitMNIST, but the experiences (as they are implemented now) would increment per batch. But this requires fundamental changes in the creation of the scenarios, so I'd leave that to the Avalanche experts in there (:

This seems the main roadblock that you have right now and the reason why you chose the current solution. I think it's easy to add a data_incremental_generator to split a predefined benchmark as di_mnist = data_incremental(split_mnist5). We don't have anything ready right now but @lrzpellegrini can help with that.

@vlomonaco thanks that might work, I'll give it a try!

Unfortunately I don't think you will solve any problems with this, it doesn't look any different from what you're doing right now.

@AntonioCarta thanks, it indeed traces back to the next() on the AvalancheConcatDataset. If you have any idea what causes this behaviour, let me know (:

I have a vague idea but I'm not sure how to solve it. @lrzpellegrini can you help on this? Is it possible to improve AvalancheConcatDataset to allow a large number of concatenations? Otherwise, do we have any alternative solutions to concatenate datasets that are more efficient?

Mattdl commented 3 years ago

The problem I see with your current implementation is that the end-user is not aware that the scenario is data incremental.

I completely agree! I was trying a tentative intermediate solution, but the generator idea you proposed would be ideal:

... to split a predefined benchmark as di_mnist = data_incremental(split_mnist5)

I'll implement CoPE as an experience-based algorithm in this PR and change the according example later in another PR, when the solutions for the data_incremental_generator and AvalancheConcatDataset have been incorporated.

lrzpellegrini commented 3 years ago

Hi @Mattdl I'm trying to reproduce the error with the AvalancheConcatDataset. Do you have the complete code for this?

Mattdl commented 3 years ago

Hi @lrzpellegrini, great! I've opened the PR (see above), you can use the example in examples/copy.py.

lrzpellegrini commented 3 years ago

Thank you @Mattdl, I'll try this immediately!

lrzpellegrini commented 3 years ago

I'm encountering a different error in MultiTaskMultiBatchDataLoader, precisely in:

self.max_len = max([len(d) for d in self.dataloaders.values()])
Mattdl commented 3 years ago

Oh yeah probably because of this. You need to keep the line in before_forward to check if the memory is empty:

if len(self.replay_mem) == 0): return
vlomonaco commented 3 years ago

@lrzpellegrini let me know if we can close it since the original issue has been solved. Maybe you want to open a new issue for the improved concat?

lrzpellegrini commented 3 years ago

Yes, I was able to reproduce the issue. I'm doing some debugging to identify the cause (probably a ConcatDataset annidation). I'll open another issue soon.