NVIDIA / Megatron-LM

Ongoing research training transformer models at scale
https://docs.nvidia.com/megatron-core/developer-guide/latest/user-guide/index.html#quick-start
Other
10.5k stars 2.35k forks source link

[BUG] Zarr checkpoint loses distributed optimizer states due to lack of synchronizers on ranks that create arrays #1053

Open LouChao98 opened 2 months ago

LouChao98 commented 2 months ago

Describe the bug

When using a Zarr distributed checkpoint and a distributed optimizer, each rank writes optimizer states according to ShardedTensor's flattened_range. The Zarr strategy uses synchronizers to ensure the correctness of parallel writing. However, synchronizers are not set for ranks that create Zarr arrays. The current implementation only adds synchronizers on ranks that open existing Zarr arrays. Consequently, the writing on the creating ranks may be lost, resulting in all zeros at the corresponding slices in the file.

To Reproduce

run pretrain_gpt.py with DP>1, TP>1 and arguments

--save {SOME_WHERE} \
--save-interval 10 \
--use-dist-ckpt \
--dist-ckpt-format zarr \
--use-distributed-optimizer \
--weight-decay 0.1 # ensure nonzero grads everywhere and make test easy

Then, a toy test inserted after dist_checkpointing.save in the following block may not pass https://github.com/NVIDIA/Megatron-LM/blob/86e2927edaa977f3e859d6f4b6d38a236114fd38/megatron/training/checkpointing.py#L405-L407

# load saved ckpt and compare

state_dict_for_load = generate_state_dict(args, model, optimizer, opt_param_scheduler, rng_state,
                        use_dist_ckpt, iteration, optim_sd_kwargs=optim_sd_kwargs)
load_strategy = get_default_load_sharded_strategy(checkpoint_name)
state_dict_loaded = dist_checkpointing.load(state_dict_for_load, checkpoint_name, load_strategy)

opt_state_correct = {key: deepcopy(value) for key, value in optimizer.optimizer.state.items()}
optimizer.load_state_dict(state_dict_loaded['optimizer'])
opt_state_loaded = optimizer.optimizer.state

for oi, opt in state_dict_loaded['optimizer']['param_state'].items():
    for key in ['exp_avg', 'exp_avg_sq']:
        if isinstance(state_dict['optimizer']['param_state'][oi][key], list):
            param_key = ';'.join([item.key for item in state_dict['optimizer']['param_state'][oi][key]])
        else:
            param_key = state_dict['optimizer']['param_state'][oi][key].key
        # after a few steps of training, it is unlikely to have zero-valued tensors in models and optimizers. So a all zero tensor indicates an error.
        assert opt[key].abs().sum() != 0, param_key  

Add a barrier under this line and using larger DP size may increase the probability of reproducing the failure: https://github.com/NVIDIA/Megatron-LM/blob/86e2927edaa977f3e859d6f4b6d38a236114fd38/megatron/core/dist_checkpointing/strategies/zarr.py#L64

Expected behavior

All tensors should be written to the disk.

Stack trace/logs If applicable, add the stack trace or logs from the time of the error.

Environment (please complete the following information):

Proposed fix

Set synchronizers when creating Zarr arrays, mirroring the logic used when opening existing Zarr arrays:

        arr = zarr.create(
            sharded_tensor.global_shape,
            dtype=np_dtype,
            store=checkpoint_dir / sharded_tensor.key,
            chunks=sharded_tensor.max_allowed_chunks(),
            compressor=None,
            fill_value=None,
            write_empty_chunks=True,
            synchronizer=(  # add this
                zarr.ProcessSynchronizer(str(checkpoint_dir / f'{sharded_tensor.key}.sync'))   # add this
                if sharded_tensor.flattened_range is not None   # add this
                else None   # add this
            ),
        )

Additional context Add any other context about the problem here.

TissueC commented 2 months ago

Same here. This issue can be pretty serious, and needs to be fixed very soon.

mikolajblaz commented 2 months ago

I confirm this is a bug and your fix looks relevant, thanks :+1:

Please note the zarr format is being deprecated and in particular does not play well with the DistributedOptimizer, so I suggest updating the ckpt format to --dist-ckpt-format torch_dist.

santha96 commented 2 months ago

Does this problem occur only when Tensor Parallelism (TP) > 1 and Data Parallelism (DP) > 1? Currently, I am using DistributedOptimizer with TP = 1 and DP > 1. Will storing checkpoints in Zarr format cause an issue?

mikolajblaz commented 2 months ago

Does this problem occur only when Tensor Parallelism (TP) > 1 and Data Parallelism (DP) > 1? Currently, I am using DistributedOptimizer with TP = 1 and DP > 1. Will storing checkpoints in Zarr format cause an issue?

With TP=1 it might be an issue as well, please use --dist-ckpt-format torch_dist.

TissueC commented 2 months ago

Does this problem occur only when Tensor Parallelism (TP) > 1 and Data Parallelism (DP) > 1? Currently, I am using DistributedOptimizer with TP = 1 and DP > 1. Will storing checkpoints in Zarr format cause an issue?

It is okay with TP = 1 and DP > 1, in my envs.

github-actions[bot] commented 3 days ago

Marking as stale. No activity in 60 days.

Jayoprell commented 1 day ago

same issue with "--use-distributed-optimizer --ckpt-format torch". Megatron: core_r0.9.0, 1afee592e85ac7994887eb5f4ef3998f76384333

mikolajblaz commented 1 day ago

same issue with "--use-distributed-optimizer --ckpt-format torch". Megatron: core_r0.9.0, 1afee59

@Jayoprell this one is not expected, can you elaborate on the symptoms? With --ckpt-format torch we don't do any file-based synchronization.

Jayoprell commented 1 day ago

When using use-distributed-optimizer with ckpt-format torch, save_checkpoint has only dp rank=0 gathers all the others optimizer status and save to file. So, it should not have any sync problem?

The error info:

================== tensor keys: dict_keys(['param']), dp rank: 0, optim_state:{}, main_param:tensor([0., 0., 0.,  ..., 0., 0., 0.], ) ================
Traceback (most recent call last):
  File "/workspace/Megatron-LM/pretrain_gpt.py", line 264, in <module>
    pretrain(
  File "/workspace/Megatron-LM/megatron/training/training.py", line 349, in pretrain
    iteration, num_floating_point_operations_so_far = train(
  File "/workspace/Megatron-LM/megatron/training/training.py", line 1366, in train
    save_checkpoint_and_time(iteration, model, optimizer,
  File "/workspace/Megatron-LM/megatron/training/training.py", line 1070, in save_checkpoint_and_time
    save_checkpoint(iteration, model, optimizer, opt_param_scheduler,
  File "/workspace/Megatron-LM/megatron/training/checkpointing.py", line 380, in save_checkpoint
    optimizer.save_parameter_state(optim_checkpoint_name)
  File "/workspace/Megatron-LM/megatron/core/optimizer/distrib_optimizer.py", line 902, in save_parameter_state
    state_dict = self.get_parameter_state_dp_zero()
  File "/workspace/Megatron-LM/megatron/core/optimizer/distrib_optimizer.py", line 852, in get_parameter_state_dp_zero
    tensors[key].detach().cpu()
KeyError: 'exp_avg'

The above tensors is optimizer parameter, and it seems that optimizer state is None.