huggingface / accelerate

🚀 A simple way to launch, train, and use PyTorch models on almost any device and distributed configuration, automatic mixed precision (including fp8), and easy-to-configure FSDP and DeepSpeed support
https://huggingface.co/docs/accelerate
Apache License 2.0
7.7k stars 938 forks source link

Data on the same dp rank is inconsistent after prepare #3092

Open v4if opened 1 week ago

v4if commented 1 week ago

System Info

- `Accelerate` version: 0.34.0
- Platform: Linux-5.10.134-13.al8.x86_64-x86_64-with-glibc2.31
- `accelerate` bash location: /root/miniconda/envs/flux_diffuser/bin/accelerate
- Python version: 3.10.14
- Numpy version: 2.1.0
- PyTorch version (GPU?): 2.4.0+cu121 (True)
- PyTorch XPU available: False
- PyTorch NPU available: False
- PyTorch MLU available: False
- PyTorch MUSA available: False
- System RAM: 1960.00 GB
- GPU type: NVIDIA A800-SXM4-80GB
- `Accelerate` default config:
        Not found

Information

Tasks

Reproduction

run with torchrun --nproc_per_node=2 --master_port=1234 sample.py

from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from accelerate import Accelerator

accelerator = Accelerator()
rank = accelerator.process_index

batch_size = 4
data_parallel_size = 1
data_parallel_rank = 0
dataset = list(range(8))

def test_torch_dataloader():
    sampler = DistributedSampler(
        dataset,
        num_replicas=data_parallel_size,
        rank=data_parallel_rank,
        shuffle=False,
    )
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        sampler=sampler,
    )

    for epoch, batch in enumerate(dataloader):
        print(f"torch dataloader. epoch:{epoch}, rank:{rank}, batch: {batch}")

def test_accelerator_dataloader():
    sampler = DistributedSampler(
        dataset,
        num_replicas=data_parallel_size,
        rank=data_parallel_rank,
        shuffle=False,
    )
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        sampler=sampler,
    )

    dataloader = accelerator.prepare_data_loader(dataloader)
    for epoch, batch in enumerate(dataloader):
        print(
            f"accelerator_dataloader. epoch:{epoch}, rank:{rank}, batch: {batch}")

# test_torch_dataloader()
test_accelerator_dataloader()

Expected behavior

During the model pre-training stage, the model needs to be parallelized. There will be tensor parallelism and sequence parallelism. For model parallelism, the same data should be sampled in the same data replica, so need to pass the sampler into the torch dataloader to ensure that the same data is generated in the same data replica. It is normal to use the torch dataloader test (test_torch_dataloader). In the case of world_size=2, data_parallel_size=1, and data_parallel_rank=0, the data on all ranks of each epoch is the same. But when using accelerator prepare test (test_accelerator_dataloader), the data obtained on the rank of each epoch is inconsistent.

torch dataloader: image accelerator dataloader: image

muellerzr commented 1 week ago

Accelerate's dataloader works very differently where we specifically do not duplicate data across ranks. See this visualization: https://www.youtube.com/watch?v=9Vfauv4ErwA&pp=ygUhaHVnZ2luZ2ZhY2UgYWNjZWxlcmF0ZSBkYXRhbG9hZGVy