NVIDIA / apex

A PyTorch Extension: Tools for easy mixed precision and distributed training in Pytorch
BSD 3-Clause "New" or "Revised" License
8.4k stars 1.4k forks source link

Dose data_prefetcher() really speed up training? #304

Open Antsypc opened 5 years ago

Antsypc commented 5 years ago

I used your Python code https://github.com/NVIDIA/apex/blob/master/examples/imagenet/main_amp.py#L256

My code is

I think this data_prefetcher could speed up training, because there is another stream sending data to GPU memory when model is running in the GPU. So there is a very small gap between two iteration.

However, this trick does not work for me. So please help me

mcarilli commented 5 years ago

It really depends on your model. It's hard to tell the impact of non-overlapped dataloading, and if prefetching successfully overlaps, without looking at a visual profile. For data prefetching to overlap, the source batch on the CPU must be pinned (in other words, the dataloader should receive the argument pin_memory=True). Also, if your data batch is a custom type, you must write a pin_memory method for your batch. See the documentation for DataLoader (scroll down past the arg list, Note, and Warning for the SimpleCustomBatch example, which I wrote).

The prefetcher is not an "official" piece of Apex, it was just a cool piece of code that we found useful for Imagenet. I think it gives us a minor (maybe 5%) speedup for Resnet50, but that's dependent on batch size and whether we're single-GPU or multi-GPU.

ngimel commented 5 years ago

Yes, data_prefetcher is really for speed-up, and users find it helpful https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548/19. It might not help in your case if you are bottlenecked on other parts of input pipeline.

Antsypc commented 5 years ago

@mcarilli @ngimel Thanks for your reply. I did some tests and again data_prefetcher() dosen't speed up. I find something I want to share with you.

data_prefetcher() here is designed to overlap sending data to GPU and GPU computation, right. So we can do gpu computation and send data to GPU parallel to save time.

But I think pytorch cuda impeletation that each device could execute only one operation at the same time. Every operations queue in line, though there is a asynchronous option, non_blocking=True in pytorch, BUT that dose not mean all operations execute parallel. Asynchronous in pytorch means concurrency computing rather than parallel computing.

My batch is a tuple with 3 elements. I send this to gpu with data_prefetcher(). My model computation has two computation stages. The first stage needs first element in the tuple. The second stage needs second element in the tuple.

I did time logging for each computaion stage and I found that both two stages cost much more time than before. So I think when data sent to GPU asynchronously, but it dosen't actually send data. When the computation need this data, then it sends data to GPU memory. So that's why my two computation time both increased.

I think that's why data_prefetcher() is not helpful for speeding up training. But it's helpful for saving GPU memory.

All above are my tests and experiences summary. Maybe it's not right. But I found something in pytorch official document as follow. pytorch CUDA asynchronous-execution

Asynchronous execution By default, GPU operations are asynchronous. When you call a function that uses the GPU, the operations are enqueued to the particular device, but not necessarily executed until later. This allows us to execute more computations in parallel, including operations on CPU or other GPUs.

In general, the effect of asynchronous computation is invisible to the caller, because (1) each device executes operations in the order they are queued, and (2) PyTorch automatically performs necessary synchronization when copying data between CPU and GPU or between two GPUs. Hence, computation will proceed as if every operation was executed synchronously.

You can force synchronous computation by setting environment variable CUDA_LAUNCH_BLOCKING=1. This can be handy when an error occurs on the GPU. (With asynchronous execution, such an error isn’t reported until after the operation is actually executed, so the stack trace does not show where it was requested.)

As an exception, several functions such as to() and copy_() admit an explicit non_blocking argument, which lets the caller bypass synchronization when it is unnecessary. Another exception is CUDA streams, explained below.

Maybe I have some misunderstanding.

BTW, I have another problem. When I run my training in two nodes with 16 GPUs on each nodes instead of one node with 16 GPUs. The time sending data to GPU nearly doubled! I am very confused.

Thanks again.

mcarilli commented 5 years ago

By default, Pytorch enqueues all operations involving the gpu (kernel launches, cpu->gpu memcopies, and gpu->cpu memcopies) on the same stream (the "default stream"). Operations on the same stream are serialized and can never overlap. For two operations to overlap, they must be in different streams. Also, for cpu->gpu and gpu->cpu memcopies in particular, the CPU-side memory must be pinned, otherwise the memcopy will be blocking with respect to all streams.

The forward pass is performed in the default stream. Therefore, for a cpu->gpu prefetch (of the next iteration's data) to overlap with the forward pass of the current iteration

  1. the data batch on the cpu must be pinned, and
  2. the prefetch must be carried out in a side stream.

Our data_prefetcher satisfies both of these requirements.

For overlapped prefetching, supplying pin_memory=True to the dataloader is always required (to satisfy 1.). If your data batch is a tuple of Tensors, then supplying pin_memory=True + using the prefetcher should be enough to enable overlap. If any element of your data batch tuple wraps its Tensors in a custom class, you must also supply a pin_memory method to that custom class that the dataloader will call to ensure the batch's cpu-side memory is pinned (to satisfy 2.) as I said in my previous post.

I'm not sure why the dataloading time doubles for a 2-node run. Are the files of your dataset only on one node's hard drive, and are they being accessed from the other node via a shared network drive or something? This would mean that one node is slower than another. For best results, the full dataset's files should be present on the hard drive of both nodes.

Lausannen commented 5 years ago

@mcarilli I have one question about training process. In one epoch, I measured four part time consuming. Part 1: for _, img, tgt in enumerate(dataloader) which costs 4.18 sec Part 2: img, tgt Transfer from CPU to GPU which costs 4.96 sec Part 3: model forward which costs 2.67 sec Part 4: model backward which costs 5.50 sec This single epoch running time is the sum of four parts. My question is why Part 1 costed so much time. I have used multi-workers to accelerate data-preprocessing. I wonder if you can tell me what happened in next(dataloader). If this next() function executes in CPU, can we have some ways to make this part overlapped by GPU running ? I think use prefetcher which is mentioned above can optimize part 2. But I am confused about part 1. Thank you very much !!!

mcarilli commented 5 years ago

I'm not sure how you're obtaining your timings, but cpu timings can be deceptive because the gpu operates asynchronously. It's hard to tell what's really causing your bottleneck without looking at a visual profile. You can try profiling using the example I posted here. main_amp.py in the gist is identical to apex/examples/imagenet/main_amp.py. When I create those profiles, I see that prefetcher.next() takes a negligibly small amount of time compared to the forward and backward pass. I also see that the memcopy of the next iteration's data does successfully overlap with the forward pass.

Lausannen commented 5 years ago

@mcarilli Thank you very much ! I have used Nvidia Visual Profiler to analyse my code under your guidance. I think my problem in dataloader may be related with initialization. Now the time in preparing data is very small. Next step I want to optimize the time transfer data from CPU to GPU. However, when I used prefetcher , I met one question: the dataloader returned NoneType instead of images and targets. I also noticed that images and targets was true before it entered record_stream() ( memory -> CUDA memory). In your example, I found your annotations which represented if record_stream was not useful, we could also allocate new space in CUDA memory. But I could not make it successful. The following is my prefetcher code. What are different from your example are that in my case, images are organized by list ([tensor, tensor, tensor, tensor]) and GTs are in dictionary type. Sorry to bother you again .

class data_prefetcher():
    def __init__(self, loader):
        self.loader = iter(loader)
        self.stream = torch.cuda.Stream()
        self.preload()

    def preload(self):
        try:
            self.next_features, self.next_targets, _ = next(self.loader)
        except StopIteration:
            self.next_features = None
            self.next_targets  = None
            return
        # self.next_features_gpu = []
        # self.next_targets_gpu = {}
        # for xaf in self.next_features:
        #     self.next_features_gpu.append(torch.empty_like(xaf, device='cuda'))
        # for key in self.next_targets.keys():
        #     self.next_targets_gpu[key] = torch.empty_like(self.next_targets[key], device='cuda')
        # self.stream.wait_stream(torch.cuda.current_stream())
        with torch.cuda.stream(self.stream):
            self.next_features = [single_feature.cuda(non_blocking=True) for single_feature in self.next_features]
            if isinstance(self.next_targets, dict):
                for key in self.next_targets.keys():
                    self.next_targets[key] = self.next_targets[key].cuda(non_blocking=True)
            else:
                self.next_targets = [single_target.cuda(non_blocking=True) for single_target in self.next_targets]
            # for index in range(len(self.next_features_gpu)):
            #     self.next_features_gpu[index].copy_(self.next_features[index], non_blocking=True)
            # for key in self.next_targets_gpu.keys():
            #     self.next_targets_gpu[key].copy_(self.next_targets[key], non_blocking=True)

    def next(self):
        torch.cuda.current_stream().wait_stream(self.stream)  
        # features = self.next_features_gpu
        # targets = self.next_targets_gpu
        features = self.next_features
        targets = self.next_targets
        if features is not None:
            features = [xaf.record_stream(torch.cuda.current_stream()) for xaf in features]
        if targets is not None:
            targets = [targets[xaf].record_stream(torch.cuda.current_stream()) for xaf in targets.keys()]
        self.preload()
        return features, targets
mcarilli commented 5 years ago

Using the existing uncommented code (aka record_stream) is the preferred approach. The above code looks like it should work, and properly handle the end of the epoch where next_features and next_targets are None. What exactly is the error you see?

Also, I said this before but that was a while ago so it's worth reminding: the prefetcher will not enable overlap unless you also supply pin_memory=True to the Pytorch dataloader constructor.

Lausannen commented 5 years ago

@mcarilli Sorry for my late reply. I have used pin_memory=True in Pytorch dataloader. The code did not report an obvious error. However, after record_stream(), the features have become NoneType , which caused failures.

    def next(self):
        torch.cuda.current_stream().wait_stream(self.stream)
        features = self.next_features
        targets  = self.next_targets
        if features is not None:
            if features[0] is not None:
                print("This is rank {} and features[0] type {}".format(get_rank(), type(features[0])))
            features = [xaf.record_stream(torch.cuda.current_stream()) for xaf in features]
            if features[0] is None:
                print("This is rank {} and features[0] is None".format(get_rank()))
        if targets is not None:
            print("This is rank {} and targets are not None".format(get_rank()))
            targets = [targets[xaf].record_stream(torch.cuda.current_stream()) for xaf in targets.keys()]
            if targets is None:
                print("This is rank {} and targets is None".format(get_rank()))
        self.preload()
        return features, targets

After running the above code, I get info as following:

This is rank 0 and features[0] type <class 'torch.Tensor'>
This is rank 0 and features[0] is None
This is rank 1 and features[0] type <class 'torch.Tensor'>
This is rank 1 and features[0] is None
This is rank 2 and features[0] type <class 'torch.Tensor'>
This is rank 2 and features[0] is None
This is rank 3 and features[0] type <class 'torch.Tensor'>
This is rank 3 and features[0] is None
Lausannen commented 5 years ago

I try to split the features list and targets dictionary into single tensor. The code can run normally. However, it seems very slow when getting batch data compared with traditional way.

KylinA1 commented 5 years ago
def preload(self):
        try:
            self.next_input, self.next_target = next(self.loader)
        except StopIteration:
            self.next_input = None
            self.next_target = None
            return
        with torch.cuda.stream(self.stream):
            self.next_input = self.next_input.cuda(non_blocking=True)
            self.next_target = self.next_target.cuda(non_blocking=True)
            # With Amp, it isn't necessary to manually convert data to half.
            # if args.fp16:
            #     self.next_input = self.next_input.half()
            # else:
            self.next_input = self.next_input.float()
            self.next_input = self.next_input.sub_(self.mean).div_(self.std)

why don't you put try except into the stream, this step is also time consuming. Also, I try to count the preload() time costing in next(), it stills take a lot time.

Roffild commented 5 years ago
from torch.utils.data.dataloader import _DataLoaderIter
class CUDAPrefetcher(_DataLoaderIter):
    def __init__(self, loader, device=None, priority=0):
        if not torch.cuda.is_available():
            raise Exception("Only CUDA")
        super(CUDAPrefetcher, self).__init__(loader)
        self.device = device
        self.stream = torch.cuda.Stream(device=device, priority=priority)
        self.last = None

    def __next__(self):
        torch.cuda.default_stream(device=self.device).wait_stream(stream=self.stream)
        result = self.last
        if result is None:
            result = super(CUDAPrefetcher, self).__next__()  # may raise StopIteration
            for x, d in enumerate(result):
                result[x] = d.to(device=self.device, non_blocking=False)
        try:
            self.last = super(CUDAPrefetcher, self).__next__()
            with torch.cuda.stream(stream=self.stream):
                for x, d in enumerate(self.last):
                    self.last[x] = d.to(device=self.device, non_blocking=True)
        except StopIteration:
            self.last = None
        return result
        for ip, op in CUDAPrefetcher(DataLoader(TensorDataset(inputs, outputs), batch_size=int(20000), pin_memory=True), device=dev, priority=-10):

This is my attempt to solve the problem, but there is no acceleration.

youngfly11 commented 5 years ago

@Lausannen , have you fixed this bugs properly?

Lausannen commented 5 years ago

@youngfly11 Not yet. Maybe https://zhuanlan.zhihu.com/p/80695364 can help you.

ggaemo commented 4 years ago

@mcarilli Sorry for my late reply. I have used pin_memory=True in Pytorch dataloader. The code did not report an obvious error. However, after record_stream(), the features have become NoneType , which caused failures.

    def next(self):
        torch.cuda.current_stream().wait_stream(self.stream)
        features = self.next_features
        targets  = self.next_targets
        if features is not None:
            if features[0] is not None:
                print("This is rank {} and features[0] type {}".format(get_rank(), type(features[0])))
            features = [xaf.record_stream(torch.cuda.current_stream()) for xaf in features]
            if features[0] is None:
                print("This is rank {} and features[0] is None".format(get_rank()))
        if targets is not None:
            print("This is rank {} and targets are not None".format(get_rank()))
            targets = [targets[xaf].record_stream(torch.cuda.current_stream()) for xaf in targets.keys()]
            if targets is None:
                print("This is rank {} and targets is None".format(get_rank()))
        self.preload()
        return features, targets

After running the above code, I get info as following:

This is rank 0 and features[0] type <class 'torch.Tensor'>
This is rank 0 and features[0] is None
This is rank 1 and features[0] type <class 'torch.Tensor'>
This is rank 1 and features[0] is None
This is rank 2 and features[0] type <class 'torch.Tensor'>
This is rank 2 and features[0] is None
This is rank 3 and features[0] type <class 'torch.Tensor'>
This is rank 3 and features[0] is None

I have the same issue. Why is the record stream outputing NoneType???

ggaemo commented 4 years ago

The solution that I found was manually keeping every variable as the attribute of the data_prefecther, self.attribute_name = attribute

youngfly11 commented 4 years ago

The solution that I found was manually keeping every variable as the attribute of the data_prefecther, self.attribute_name = attribute

Hi, How do you save this problem? Can you show the code snippet here for clear? Thanks

qiuhaining commented 4 years ago

It really depends on your model. It's hard to tell the impact of non-overlapped dataloading, and if prefetching successfully overlaps, without looking at a visual profile. For data prefetching to overlap, the source batch on the CPU must be pinned (in other words, the dataloader should receive the argument pin_memory=True). Also, if your data batch is a custom type, you must write a pin_memory method for your batch. See the documentation for DataLoader (scroll down past the arg list, Note, and Warning for the SimpleCustomBatch example, which I wrote).

The prefetcher is not an "official" piece of Apex, it was just a cool piece of code that we found useful for Imagenet. I think it gives us a minor (maybe 5%) speedup for Resnet50, but that's dependent on batch size and whether we're single-GPU or multi-GPU.

hello,I find the bug that it is possible to normalize the ‘self.next_input ’ before another stream finish to send data to GPU memory!

` with torch.cuda.stream(self.stream): self.next_input = self.next_input.cuda(non_blocking=True) self.next_target = self.next_target.cuda(non_blocking=True)

With Amp, it isn't necessary to manually convert data to half.

        # if args.fp16:
        #     self.next_input = self.next_input.half()
        # else:
        self.next_input = self.next_input.float()
        self.next_input = self.next_input.sub_(self.mean).div_(self.std)

`

DelightRun commented 4 years ago

After do some deep profiling, I think the problem is due to the record_stream(torch.cuda.current_stream()), which will block the default stream until copy complete. Here is what PyTorch's document says about record_stream(link`:

Ensures that the tensor memory is not reused for another tensor until all current work queued on stream are complete.

Notices that "until all current work queued on stream are complete.". It means the tensor memory will not be reused if current stream is stilling working. But for some reason, it blocks the default stream until the tensor's stream is completed.

Here is the timeline: image As you can see, the default stream is blocked until HtoD copy is done.

After remove the record_stream: image

wzsunshine commented 4 years ago

@DelightRun Hi, thanks for showing the deep profiling results. It helps! According to the data prefetcher, I still have some confusion here: (1) could you please explain this sentence more? The reason why must use pin memory here is a little bit confused me. I think the memcpyAsyn will be called. ("for cpu->gpu and gpu->cpu memcopies in particular, the CPU-side memory must be pinned, otherwise, the memcopy will be blocking with respect to all streams.").

(2) from the data_prefetcher code, it will prefetch two batches before the first iteration I think (something like the fifth iteration's data overlap with the batch processing of the third iteration). I am not sure why prefetch two here. ("for a cpu->gpu prefetch (of the next iteration's data) to overlap with the forward pass of the current iteration”)

Appreciate any response.

wico-silva commented 2 years ago

For those getting NoneType (cc: @ggaemo and @mcarilli), it's probably because you are assigning your inputs and/or targets to the return of record_stream(). Taking the code above as an example:

# xaf.record_stream(torch.cuda.current_stream()) always returns None, so the list will be full of None's
features = [xaf.record_stream(torch.cuda.current_stream()) for xaf in features]

you should instead do something like this:

features_list = []
for k, v in features.items():
    v.record_stream(torch.cuda.current_stream())
    features_list.append(v)