Lightning-AI / pytorch-lightning

Pretrain, finetune and deploy AI models on multiple GPUs, TPUs with zero code changes.
https://lightning.ai
Apache License 2.0
28.04k stars 3.36k forks source link

setting accumulate_grad_batches (accumulate_grad_steps) >1 with deepspeed plugin and use cpu offload will lead to model training incorrectly #6170

Closed hwade closed 3 years ago

hwade commented 3 years ago

🐛 Bug

  1. Setting accumulate_grad_batches (accumulate_grad_steps) > 1 with deepspeed plugin will lead to slow update of model params. I found the reason is the mismatch between ds.accumulate_grad_steps (deepspeed) and pl.accumulate_grad_batches (pytorch_lightning). It's not allowed to specify ds.accumulate_grad_steps (config file or hparams) by pl.DeepSpeedPlugin which sets ds.accumulate_grad_steps equal to pl.accumulate_grad_batches forcely. However, pl.Trainer and ds.engine accumulate the training steps respectively. For example, I set pl.accumulate_grad_batches = 64 and ds.accumulate_grad_steps will be set to 64 automatically. When pl.Trainer triggers ds.engine.step() after 64 steps, ds.engine.micro_step += 1. So it needs 64 64 steps in trainer to make ds.engine trigger optimizer to step once and update model params. In addition, loss to backward in optimizer has been scaled twice (64 64) in this situation.

    code in deepspeed/runtime/engine.py

    class DeepSpeedEngine:
    ...
    def is_gradient_accumulation_boundary(self):
    return (self.micro_steps + 1) % self.gradient_accumulation_steps() == 0
    ...
    def step(self, lr_kwargs=None):
    ...
    # Update the model when we reach gradient accumulation boundaries
    if self.is_gradient_accumulation_boundary():
    if self.progressive_layer_drop:
    self.progressive_layer_drop.update_state(self.global_steps)
    self._take_model_step(lr_kwargs) 
    ...
    self.micro_steps += 1

    code in pytorch_lightning/trainer/training_loop.py

    class TrainLoop:
    ...
    def should_accumulate(self):
    # checks if backward or backward + optimizer step (via closure)
    accumulation_done = self._accumulated_batches_reached()
    is_final_batch = self._num_training_batches_reached()
    return not (accumulation_done or is_final_batch)
    ...
    def run_training_batch(...):
    ...
    if self.should_accumulate():
    # backward
    else:
    ...
    # actually call ds.engine.step() when using DeepSpeedPlugin
    self.optimizer_step(optimizer, opt_idx, batch_idx, train_step_and_backward_closure)
  2. For the sake of training with deepspeed zero offload, I try to set ds.accumulate_grad_steps = 1 manually before ds.DeepSpeedEngine.initialize() in pytorch_lightning/plugins/training_type/deepspeed.py (and pl.accumulate_grad_batches = 64). It seems to work well at the start, and the loss start to reduce with a little bit faster (64 steps to update model params). However, it's weired that loss still reduce slowly after warmup steps. Looking insight ZeRO Offload and deepspeed implement, I found it still has some mistakes in hparams setting. Model will do forward and backward at evey single step with mini batch, and the gradients calculated will be moved to cpu device (actually in memory) with cpu offload turning on. But deepspeed sets and overwrites the buffer to store the gradients in gpu device because it hits the accumulation boundary (ds.accumulate_grad_steps=1 and allreduce=True) at every training step in async_inplace_copy_grad_to_fp32_buffer_from_gpu(param). After finishing every training batch (64), pl.trainer will trigger once of ds.engine.step() which calls the optimizer.step() and updates the partially updated params to model.

    code in deepspeed/runtime/zero/stage2.py

    
    def copy_grads_in_partition(self, param):
    if self.cpu_offload:
        if self.gradient_accumulation_steps > 1:
            self.async_accumulate_grad_in_cpu_via_gpu(param)
    
        if self.is_gradient_accumulation_boundary:
            self.set_norm_for_param_grad_in_gpu(param)
    
            self.update_overflow_tracker_for_param_grad(param)
    
            self.async_inplace_copy_grad_to_fp32_buffer_from_gpu(param)
    
        return 
    ...
  3. Finally, I set pl.accumulate_grad_batches = 1 and ds.accumulate_grad_steps = 64. Though there still has some problems (e.g. pl.trainer training process bar display and learning rate scheduler step mismatch), model starts to train normally. While training process bar display issue is not the big due, lr_scheduler step will lightly make some effects on the model training. The problem in lr_scheduler step is similar to previous problem. Trainer triggers lr_scheduler.step() once after a training batch (1 step), but ds.engine.optimizer updates params latter (64 steps). It will make the accumulated gradients sum up by weighted gradients with different learning rate. Thus, I overrided the optimizer_step() function and counted the lr_scheduler steps manually (don't return lr_scheduler in configure_optimizers()). It's obvious that the overrided optimizer_step() will not lead to the exception (raised by pytorch_lightning==v1.2.0) because of the pl.accumulate_grad_batches == 1 (but raise > 1) and model works fine.

    ./MYCODE.py

    
    class MyPLModule(pl.LightningModule):
    ...
    def optimizer_step(self, epoch, batch_idx, optimizer,
    optimizer_idx, optimizer_closure, on_tpu,
    using_native_amp, using_lbfgs):
    if self.trainer.use_tpu or on_tpu:
    xm.optimizer_step(optimizer)
    else:
    optimizer.step(closure=optimizer_closure)
    optimizer.zero_grad()
        if self.mStep % 64 == 0:
            self.lr_scheduler.step()
        self.mStep += 1

Environment

tchaton commented 3 years ago

Dear @hwade,

Thanks a lot for such a detailed analysis.

@SeanNaren @hwade I am thinking the simplest solution would be to synchronise deepspeed and Lightning internal counters as follow.

Any thoughts ?

                if self.should_accumulate():
                    # For gradient accumulation

                    # -------------------
                    # calculate loss (train step + train step end)
                    # -------------------

                    # automatic_optimization=True: perform dpp sync only when performing optimizer_step
                    # automatic_optimization=False: don't block synchronization here
                    with self.block_ddp_sync_behaviour():
                        self.training_step_and_backward(
                            split_batch, batch_idx, opt_idx, optimizer, self.trainer.hiddens
                        )

                    batch_outputs = self._process_closure_result(
                        batch_outputs=batch_outputs,
                        opt_idx=opt_idx,
                    )

            self.trainer.accelerator.batch_accumulation_end()
class DeepSpeedPrecisionPlugin(PrecisionPlugin):

    def __init__(self, precision):
        super().__init__()
        self.precision = precision

    def batch_accumulation_end(self):
           deepspeed_engine.step()

    def pre_optimizer_step(
        self, pl_module: LightningModule, optimizer: Optimizer, optimizer_idx: int, lambda_closure: Callable, **kwargs
    ) -> bool:
        deepspeed_engine = pl_module.trainer.model
        # DeepSpeed not support closures.
        lambda_closure()

        if not pl_module.automatic_optimization:
            pl_module.trainer.call_hook("on_after_backward")

        deepspeed_engine.step()

        return False
SeanNaren commented 3 years ago

Dear @hwade,

Thanks a lot for such a detailed analysis.

@SeanNaren @hwade I am thinking the simplest solution would be to synchronise deepspeed and Lightning internal counters as follow.

Any thoughts ?

                if self.should_accumulate():
                    # For gradient accumulation

                    # -------------------
                    # calculate loss (train step + train step end)
                    # -------------------

                    # automatic_optimization=True: perform dpp sync only when performing optimizer_step
                    # automatic_optimization=False: don't block synchronization here
                    with self.block_ddp_sync_behaviour():
                        self.training_step_and_backward(
                            split_batch, batch_idx, opt_idx, optimizer, self.trainer.hiddens
                        )

                    batch_outputs = self._process_closure_result(
                        batch_outputs=batch_outputs,
                        opt_idx=opt_idx,
                    )

            self.trainer.accelerator.batch_accumulation_end()
class DeepSpeedPrecisionPlugin(PrecisionPlugin):

    def __init__(self, precision):
        super().__init__()
        self.precision = precision

    def batch_accumulation_end(self):
           deepspeed_engine.step()

    def pre_optimizer_step(
        self, pl_module: LightningModule, optimizer: Optimizer, optimizer_idx: int, lambda_closure: Callable, **kwargs
    ) -> bool:
        deepspeed_engine = pl_module.trainer.model
        # DeepSpeed not support closures.
        lambda_closure()

        if not pl_module.automatic_optimization:
            pl_module.trainer.call_hook("on_after_backward")

        deepspeed_engine.step()

        return False

Thanks guys and thanks @hwade for such a detailed breakdown, was super helpful.

@tchaton's proposal is definitely viable, by having a nice hook that's called irrespective of accumulation, allowing deepspeed to increment internal counters.

However an alternative is to check if the training type plugin handles accumulation, and should accumulate turns into a no-op. This means at every step the accumulation is called internally in deepspeed. This might avoid the need of two optimizer.step calls, but may be more tricky and involved to implement.

I think on par it's important to get this functionality working, so I think @tchaton's proposal is awesome to get this working. We'll also need a test to ensure that the effective batch size is as expected

hwade commented 3 years ago

@tchaton @SeanNaren Thanks to your replys. @tchaton 's proposal to sync step between trainer and ds.optimizer seems to work in the simplest way. However, I think it's not only the accumulated step need to be synced, but the loss scale op should be cut as one because loss propagated backward will be scaled twice by the accumulation steps in training_step and deepspeed. I 'm not sure if there has any other operations do twice in trainer and deepspeed respectively, so I agree with the opinion from @SeanNaren to turn one of them into no-op.

deepspeed/runtime/engine.py


def backward(self, loss, allreduce_gradients=True, release_loss=False):
r"""Execute backward pass on the loss
    Arguments:
        loss: Torch tensor on which to execute backward propagation
        allreduce_gradients: If this is False, then gradient averaging will be skipped. Default is True.
    """
    ...
    # scale loss w.r.t. gradient accumulation if needed
    if self.gradient_accumulation_steps() > 1:
        loss = self._scale_loss(loss.float())

> pytorch_lightning/trainer/training_loop.py
```python
    def training_step(self, split_batch, batch_idx, opt_idx, hiddens):
        ...
        if self.trainer.train_loop.automatic_optimization:
            # accumulate loss
            # (if accumulate_grad_batches = 1 no effect)
            if is_result_obj:
                closure_loss = training_step_output.minimize
            else:
                closure_loss = training_step_output.batch_loss

            closure_loss = closure_loss / self.trainer.accumulate_grad_batches

            # the loss will get scaled for amp. avoid any modifications to it
            untouched_loss = closure_loss.detach().clone()
tchaton commented 3 years ago

Dear @hwade,

Great catch ! You seem to have a good understanding of Lightning internals. Would love to see you making contributions :) Let me address those and ping you for review.

Best, T.C

hwade commented 3 years ago

@tchaton Thanks to your invitation. I am very glad to take part in the contribution team of Lightning. :)

SeanNaren commented 3 years ago

A fix has made it's way into master that @tchaton was working, could you verify it works @hwade?

hwade commented 3 years ago

@SeanNaren @tchaton thx to your works. I will try it and give feedback as soon as possible.

hwade commented 3 years ago

@SeanNaren @tchaton I have tested the DeepspeedPlugin in branch V1.3.0rc1. lr_scheduler, scaled loss and accumulated gradients all work fine in my case. Looking insight of the code implemented, bug fixed by a really simple way without too much changes. Really appreciate to your works @tchaton

SeanNaren commented 3 years ago

Thanks @hwade :)