pytorch / ignite

High-level library to help with training and evaluating neural networks in PyTorch flexibly and transparently.
https://pytorch-ignite.ai
BSD 3-Clause "New" or "Revised" License
4.52k stars 617 forks source link

Handling empty datasets in distributed metric computation #1242

Open linhr opened 4 years ago

linhr commented 4 years ago

🐛 Bug description

Metric computation does not work properly in distributed settings when some processes do not handle any batch in the dataset. It becomes a problem when small validation or test datasets are distributed to processes in an imbalanced manner.

How to Reproduce

Create a Python script named main.py with the following content.

import torch
import ignite.distributed as idist
from torch.utils.data import IterableDataset, DataLoader
from ignite.metrics import Loss
from ignite.engine.engine import Engine
from ignite.engine.events import Events

class SampleDataset(IterableDataset):
    def __iter__(self):
        if idist.get_rank() == 0:
            yield torch.zeros((2, 3)), torch.ones((2, 3))

def report_metrics(engine):
    print(engine.state.metrics)

def test(local_rank):
    data_loader = DataLoader(SampleDataset(), batch_size=None)
    engine = Engine(lambda _engine, batch: batch)
    Loss(torch.nn.BCELoss(reduction="mean")).attach(engine, "loss")
    engine.add_event_handler(Events.COMPLETED, report_metrics)
    engine.run(data_loader)

with idist.Parallel(backend="gloo") as parallel:
    parallel.run(test)

Run the following command inside a CPU Docker container with PyTorch and Ignite installed.

python -m torch.distributed.launch --nproc_per_node=2 --use_env main.py

Problem 1

The command terminated with an error. Part of the output is shown below.

terminate called after throwing an instance of 'gloo::EnforceNotMet'
  what():  [enforce fail at /opt/conda/conda-bld/pytorch_1595629403081/work/third_party/gloo/gloo/transport/tcp/pair.cc:490] op.preamble.length <= op.nbytes. 8 vs 4

It seems there is type inconsistency (int vs float) inside idist.all_reduce() when calling compute(), because not all processes have called update() at least once. A simple fix could be changing this line to self._sum = 0.0.

However this issue could affect other metrics as well. We probably need unit tests for such scenario for all metrics.

Problem 2

In the above script, if we change Loss(...) to the precision or recall metric (e.g. Precision()), we get the following error message.

Engine run is terminating due to exception: Precision must have at least one example before it can be computed..

The issue is the verification should actually be moved after idist.all_reduce(). Although some processes may have seen empty dataset, the metric is still valid collectively.

Problem 3

After fixing Problem 2, there is still an issue with multi-label precision or recall. For example, changing Loss(...) to Precision(is_multilabel=True, average=True) and running the script will give the following error:

Engine run is terminating due to exception: 'float' object has no attribute 'mean'.

The issue is with this line. Because not all processes have called update() at least once, there is again type inconsistency, where in some processes self._true_positives is of type float while in other processes it is a scalar tensor.

Environment

vfdev-5 commented 4 years ago

@linhr thanks for the report ! Let me reproduire and investigate the issue.

cc @n2cholas as we are working on metrics right now, we maybe have to take this into account

vfdev-5 commented 4 years ago

@linhr thanks for clean reproduction code snippet. I was playing around and I confirm that there can be some incoherences between dtypes.

It becomes a problem when small validation or test datasets are distributed to processes in an imbalanced manner.

Actually, is it correct to distribute the data like that creating asymmetry ?

I'm just replaced the sampler such that there is 1 common batch for all processes and 1 for only zero rank and code runs seems like as expected

class SampleDataset(IterableDataset):
    def __iter__(self):
        yield torch.zeros((4, 3)), torch.ones((4, 3))

        if idist.get_rank() == 0:
            yield torch.zeros((2, 3)), torch.ones((2, 3))

Problem 1

I agree we should fix inconsistent dtype between initialization and updated accumulators.

Problem 2

Yes, I agree this is a bug, all other metrics use sync_all_reduce which reduces before the verification of number of examples and raising an exception.

Problem 3

Yes, looks like a bug, but I need to take a deeper look into it...

If you would like help us with a fix of the first two issue, please feel free to send a PR. Thanks again!

n2cholas commented 4 years ago

Problem 1

This will be fixed once #1238 is merged. All the metrics will create their accumulators on the right device, except Precision and Recall in the non-multilabel case, since we don't know the shape of the updates during initialization (this logic should clarify why)

Problem 2

I'm not sure if moving the check after the all_reduce is sufficient. In some cases, you would have a shape mismatch since the initialized value would just be a 0 while the ranks that actually updated a value will have some other shape, so the all-reduce would fail. Is there any way to all-reduce only a subset of ranks? If you knew the shape of _true_positives and _positives in advance, you could replace any empty _true_positives and _positives with tensors filled with 0s, then the operation would be fine. I'm not too familiar with how the distributed primitives work, so please correct me if I'm wrong.

Problem 3

This should also be fixed once #1238 is merged.

linhr commented 4 years ago

Thanks @vfdev-5 and @n2cholas for the reply! Glad to learn that many of the issues will be fixed in #1238!

Actually, is it correct to distribute the data like that creating asymmetry ?

I'm just replaced the sampler such that there is 1 common batch for all processes and 1 for only zero rank and code runs seems like as expected

Yeah, this issue is about an edge case that are unlikely to happen usually. Here is a situation similar to my use case when I discovered the issue: there are 4 GPUs (and 4 processes) for distributed training; the validation data is partitioned into 3 files, and each process will load one or more data files during validation. We can see that one process does not handle any validation data, so metric calculation needs to take this into account. At beginning I was puzzled by the error message since it didn't directly connect me to data imbalance in my setting. :)

Problem 2

I'm not sure if moving the check after the all_reduce is sufficient. In some cases, you would have a shape mismatch since the initialized value would just be a 0 while the ranks that actually updated a value will have some other shape, so the all-reduce would fail. Is there any way to all-reduce only a subset of ranks? If you knew the shape of _true_positives and _positives in advance, you could replace any empty _true_positives and _positives with tensors filled with 0s, then the operation would be fine. I'm not too familiar with how the distributed primitives work, so please correct me if I'm wrong.

My understanding is moving the check after all_reduce would be sufficient for now, since the only case for _true_positives and _positives to be non-scalar tensors is when is_multilabel=True and average=False, which does not support distributed metric calculation anyway (see the warning here).

I agree that supporting all_reduce for potentially empty tensor could be tricky. I guess we need to investigate the solution for the is_multilabel=False and average=False case. However one thing that confused me is how this non-average multiclass case is implemented currently. The docstring here says the metric will return a tensor for each class, but I do not see it implemented this way in update() (code). Maybe I missed something?

vfdev-5 commented 4 years ago

Yeah, this issue is about an edge case that are unlikely to happen usually. Here is a situation similar to my use case when I discovered the issue: there are 4 GPUs (and 4 processes) for distributed training; the validation data is partitioned into 3 files, and each process will load one or more data files during validation. We can see that one process does not handle any validation data, so metric calculation needs to take this into account. At beginning I was puzzled by the error message since it didn't directly connect me to data imbalance in my setting. :)

@linhr thanks for providing the info about your use-case. Won't be better to read everything in all processes and then split the data in a more balanced way between processes ? Or try to use 3 processes instead of 4 ?

My understanding is moving the check after all_reduce would be sufficient for now, since the only case for _true_positives and _positives to be non-scalar tensors is when is_multilabel=True and average=False, which does not support distributed metric calculation anyway (see the warning here).

I think this is currently a bug as for other metrics all_reduce is performed before any computing metric values. We should keep that. For currently unsupported case, it will be supported after solving the same issue with EpochMetric (a PR is in progress). But in all the cases, reduction or data gathering should be before, IMO.

EDIT: and seems like there is a wrong condition in precision/recall to raise NotComputableError: self._positives > 0. We can have a situation where there is an update but no any positive occurences...

I agree that supporting all_reduce for potentially empty tensor could be tricky.

Yes, I agree. This could add an overhead (by pre-checking input size etc) for nomimal cases which is not that good.

However one thing that confused me is how this non-average multiclass case is implemented currently. The docstring here says the metric will return a tensor for each class, but I do not see it implemented this way in update() (code). Maybe I missed something?

In multiclass case, we have y_pred as logits of shape (BatchSize, NumClasses) and y of shape (BatchSize, ). In case average=False, we have to obtain precision or recall as a tensor of shape (NumClasses, ). For that we compute true positives and positives as tensors of shape (NumClasses, ) from correct which is a tensor of (BatchSize, NumClasses) by summing it up on dimension zero. Does it sound correct for you ?

linhr commented 4 years ago

Thanks @vfdev-5 !

Won't be better to read everything in all processes and then split the data in a more balanced way between processes ? Or try to use 3 processes instead of 4 ?

Yes, definitely. Besides fixing the issues for the edge case, I agree it is more reasonable to distribute data in a more balanced way, depending on how many processes we have.

But in all the cases, reduction or data gathering should be before, IMO.

Agreed!

In multiclass case, we have y_pred as logits of shape (BatchSize, NumClasses) and y of shape (BatchSize, ). In case average=False, we have to obtain precision or recall as a tensor of shape (NumClasses, ). For that we compute true positives and positives as tensors of shape (NumClasses, ) from correct which is a tensor of (BatchSize, NumClasses) by summing it up on dimension zero. Does it sound correct for you ?

Thanks for the explanation! For this non-average multiclass case, I didn't realize _true_positives and _positives are actually tensors in update() since they are initialized as 0 in reset(). Now the logic is clear to me. (I guess we cannot initialize them as tensors because we do not know the size in reset(), until we see real batches when updating metrics.)

So empty dataset is an issue as well for the non-average multiclass case. (My original post only looked at multilabel cases.) After thinking about it a bit more, I found we can probably solve the problem by two all_reduce operations, one for getting tensor size and the other for getting tensor data. The idea is illustrated in the following example.

# test.py
import time
import torch
import ignite.distributed as idist

def all_reduce_metric(value):
    (size,) = value.shape
    max_size = idist.all_reduce(size, "MAX")
    output = torch.zeros((max_size,))
    output[:size] = value
    return idist.all_reduce(output)

def test(local_rank):
    before = torch.arange(idist.get_rank())
    after = all_reduce_metric(before)  # this is not an in-place operation
    time.sleep(idist.get_rank())  # sleep to avoid messing up `print()`
    print('before:', before, 'after:', after)

with idist.Parallel(backend="gloo") as parallel:
    parallel.run(test)

Run the distributed command:

python -m torch.distributed.launch --nproc_per_node=4 --use_env test.py

Output:

before: tensor([], dtype=torch.int64) after: tensor([0., 2., 2.])
before: tensor([0]) after: tensor([0., 2., 2.])
before: tensor([0, 1]) after: tensor([0., 2., 2.])
before: tensor([0, 1, 2]) after: tensor([0., 2., 2.])

Let me know if the approach makes sense. I'm happy to discuss improvements/alternatives. :)

For the is_multilabel=True and average=False case, as well as EpochMetric (I guess it's #1229 ?), the task is to collect tensors of size depending on the number of samples. I assume we probably need to consider the situation where each process gets different numbers of batches (or even no batch), or the sizes of the last batch are different among processes, etc. I haven't played with the all_gather operation myself, so I'm not sure if these situations are taken care of already.

vfdev-5 commented 4 years ago

Let me know if the approach makes sense. I'm happy to discuss improvements/alternatives. :)

@linhr yes, this can be one of approaches to handle variable size data. But calling the first all reduce on equal sized data will be what I called the overhead in a previous message. I was thinking if we could make metric's collective ops like sync_all_reduce configurable such that user could select its own method for asymmetric cases and by default use current implementation... Such that in your case, you could code your own all_reduce_metric code and deal with the asymmetry.

For the is_multilabel=True and average=False case, as well as EpochMetric (I guess it's #1229 ?), the task is to collect tensors of size depending on the number of samples. I assume we probably need to consider the situation where each process gets different numbers of batches (or even no batch), or the sizes of the last batch are different among processes, etc. I haven't played with the all_gather operation myself, so I'm not sure if these situations are taken care of already.

Yes, here the idea is the same as what you did for all reduce: a) we have to create an empty max-size tensor as a placeholder (maybe undef values should be marked differently, as NaN for example), b) put data into the placeholders, c) all gather => a big tensor as single placeholder x world_size on each process. d) remove undef values and compute the metric.

vfdev-5 commented 4 years ago

Since v1.7.0 pytorch seem to support uneven inputs accross participating processes: https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel.join