huggingface / accelerate

🚀 A simple way to launch, train, and use PyTorch models on almost any device and distributed configuration, automatic mixed precision (including fp8), and easy-to-configure FSDP and DeepSpeed support
https://huggingface.co/docs/accelerate
Apache License 2.0
7.86k stars 959 forks source link

dataloader doesn't load data while gpu is training #3168

Open geekifan opened 1 week ago

geekifan commented 1 week ago

System Info

Copy-and-paste the text below in your GitHub issue

- `Accelerate` version: 1.0.0
- Platform: Linux-6.10.11-amd64-x86_64-with-glibc2.40
- `accelerate` bash location: /disk/zdata0/home/xuyifan/anaconda3/envs/llava-transformers/bin/accelerate
- Python version: 3.11.9
- Numpy version: 1.26.4
- PyTorch version (GPU?): 2.4.0 (True)
- PyTorch XPU available: False
- PyTorch NPU available: False
- PyTorch MLU available: False
- PyTorch MUSA available: False
- System RAM: 503.67 GB
- GPU type: NVIDIA RTX A6000
- `Accelerate` default config:
        - compute_environment: LOCAL_MACHINE
        - distributed_type: DEEPSPEED
        - mixed_precision: bf16
        - use_cpu: False
        - debug: False
        - num_processes: 8
        - machine_rank: 0
        - num_machines: 1
        - rdzv_backend: static
        - same_network: True
        - main_training_function: main
        - enable_cpu_affinity: False
        - deepspeed_config: {'gradient_accumulation_steps': 1, 'offload_optimizer_device': 'none', 'offload_param_device': 'none', 'zero3_init_flag': False, 'zero3_save_16bit_model': False, 'zero_stage': 3}
        - downcast_bf16: no
        - tpu_use_cluster: False
        - tpu_use_sudo: False
        - tpu_env: []

Information

Tasks

Reproduction

I use a pytroch dataloader when I use trainer from transformers. Since accelerate is the dataloader backend of trainer, I think the problem is caused by accelerate.

I use a simple training script to distill CLIP model. Part of my code:

def train(
    # model/data params
    teacher_type: str = "",
    student_type: str = "",
    student_model: str = "checkpoints/clip-vit-base-patch16",
    from_pretrained: bool = True,
    data_config: str = "data.config",
    train_dataset: str = "msrvtt_train_single",
    num_frames: int = 8,
    mask_ratio: float = 0.8,
    output_dir: str = "./model",
    # training hyperparams
    batch_size: int = 256,
    micro_batch_size: int = 64,
    num_epochs: int = 1,
    learning_rate: float = 5e-4,
    warmup_ratio: float = 0.5,
    cutoff_len: int = 32,
    # llm hyperparams
    run_name: str = None,
    save_steps: int = 100,
    seed: int = 42,
    deepspeed: str = None,
    logging_steps: int = 10,
    grad_checkpoint: bool = False,
    # system_prompt: str = None,
    bf16: bool = False,
    # to avoid fire error
    local_rank: int = 0,
):
    # save args
    args = locals()
    if int(os.environ.get("LOCAL_RANK") or 0) == 0:
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        with open(os.path.join(output_dir, "args.json"), "w") as f:
            json.dump(args, f, indent=2)

    # set NCCL_DEBUG

    gradient_accumulation_steps = batch_size // micro_batch_size

    device_map = "cuda"
    world_size = int(os.environ.get("WORLD_SIZE", 1))
    ddp = world_size != 1
    gradient_accumulation_steps = gradient_accumulation_steps // world_size
    # # if ddp and False:
    # if ddp:
    #     device_map = {"": int(os.environ.get("LOCAL_RANK") or 0)}
    #     gradient_accumulation_steps = gradient_accumulation_steps // world_size
    #     torch.distributed.init_process_group("nccl")
    #     rank, world_size = torch.distributed.get_rank(), torch.distributed.get_world_size()
    #     device_id = rank % torch.cuda.device_count()
    #     device = torch.device(device_id)
    #     torch.cuda.set_device(device)

    set_seed(seed)

    data_config = load_dataset_config(data_config, train_dataset)

    # initialize tokenizer
    tokenizer = CLIPTokenizer.from_pretrained(student_model)

    # initialize model
    feat = torch.load(data_config['feat_paths'][teacher_type], map_location='cpu')
    teacher_text_dim = feat[0]['text_embeds'].shape[-1]
    teacher_vision_dim = feat[0]['video_embeds'].shape[-1]
    config = CLIPConfigForDistill.from_pretrained(student_model)
    config.update({
        "teacher_text_dim": teacher_text_dim, 
        "teacher_vision_dim": teacher_vision_dim,
        "student_text_dim": config.text_config.projection_dim, 
        "student_image_dim": config.vision_config.projection_dim,
        "max_token_len": cutoff_len,
    })
    del feat

    model = CLIPModelForDistill(config)
    print(model.num_parameters(only_trainable=True))

    if grad_checkpoint:
        model.enable_input_require_grads()

    # initialize dataset
    train_data = DatasetForOfflineDistill(
        anno_path=data_config['anno_path'],
        data_root=data_config['data_root'],
        feat_path=data_config['feat_paths'][teacher_type],
        tokenize=False,
        num_frames=num_frames,
    )

    def custom_collate_fn(batch):
        # batch is a list of dicts
        # now = datetime.now()
        # dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
        # print(f'[{dt_string} {os.getpid()}] Rank {rank} is collating')
        collated_batch = {}
        for key in batch[0].keys():
            collated_batch[key] = [b[key] for b in batch]
        # collated_batch['video'] is a list of [num_frames, 3, 224, 224]
        # collated_batch['caption'] is a list of strings
        tokenized_caption = tokenizer(collated_batch['caption'], padding=True, return_tensors="pt")
        collated_batch['input_ids'] = tokenized_caption['input_ids']
        collated_batch['attention_mask'] = tokenized_caption['attention_mask']
        collated_batch['pixel_values'] = torch.stack(collated_batch['video'])
        collated_batch['vision_embeds'] = torch.stack(collated_batch['vision_embeds'])
        collated_batch['text_embeds'] = torch.stack(collated_batch['text_embeds'])
        return collated_batch

    trainer = TrainerForDistill(
        model=model,
        train_dataset=train_data,
        mask_ratio=mask_ratio,
        args=transformers.TrainingArguments(
            per_device_train_batch_size=micro_batch_size,
            gradient_accumulation_steps=gradient_accumulation_steps,
            warmup_ratio=warmup_ratio,
            num_train_epochs=num_epochs,
            learning_rate=learning_rate,
            fp16=True if not bf16 else False,
            bf16=bf16,
            logging_steps=logging_steps,
            save_strategy="steps",
            eval_steps=None,
            save_steps=save_steps,
            output_dir=output_dir,
            save_total_limit=1,
            load_best_model_at_end=False,
            ddp_find_unused_parameters=True if ddp else None,
            run_name=run_name,
            report_to=None,
            deepspeed=deepspeed,
            gradient_checkpointing=grad_checkpoint,
            remove_unused_columns=False,
            dataloader_num_workers=32,
            dataloader_pin_memory=True,
            dataloader_prefetch_factor=10,
            dataloader_persistent_workers=True,
        ),
        data_collator=custom_collate_fn,
    )

    if torch.__version__ >= "2" and sys.platform != "win32":
        model = torch.compile(model)

    trainer.train()

    model.save_pretrained(output_dir)

The code of dataloader:

import torch
import os
import json
from utils.video import read_frames_decord
from torchvision.transforms.v2 import Compose, Resize, CenterCrop, RandomResizedCrop, RandomHorizontalFlip, ToTensor, Normalize
from PIL import Image
from datetime import datetime

class DatasetForOfflineDistill(torch.utils.data.Dataset):
    def __init__(
            self, 
            anno_path: str | os.PathLike, 
            data_root: str | os.PathLike,
            feat_path: str | os.PathLike,
            tokenizer: torch.nn.Module | None = None,
            tokenize: bool = False,
            num_frames: int = 8,
            test: bool = False
        ):
        with open(anno_path) as f:
            self.anno = json.load(f)
        self.data_root = data_root
        # keys of each item: idx, text_embeds, video_embeds
        self.feat = torch.load(feat_path, weights_only=True)
        self.num_frames = num_frames
        self.transforms = self.build_transforms(test)
        self.tokenizer = tokenizer
        self.tokenize = tokenize

    def build_transforms(self, test: bool):
        image_mean =  [0.48145466, 0.4578275, 0.40821073]
        image_std = [0.26862954, 0.26130258, 0.27577711]
        size = 224
        normalize = (
            Normalize(mean=image_mean, std=image_std)
        )
        train_transforms = Compose([
            RandomResizedCrop(size),
            RandomHorizontalFlip(),
            ToTensor(),
            normalize,
        ])
        val_transforms = Compose([
            Resize(size),
            CenterCrop(size),
            ToTensor(),
            normalize,
        ])
        if test:
            return val_transforms
        return train_transforms

    def __len__(self):
        return len(self.anno)

    def __getitem__(self, idx):
        rank = int(os.environ.get("LOCAL_RANK") or 0)
        now = datetime.now()
        dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
        print(f'[{dt_string} {os.getpid()}] Rank {rank} is loading', idx)
        item = self.feat[idx]
        anno_idx = item['idx']
        # [teacher_dim] -> [1, teacher_dim]
        text_embeds = item['text_embeds']
        video_embeds = item['video_embeds']
        caption = self.anno[anno_idx]['caption']
        if self.tokenizer is not None and self.tokenize:
            tokenized_caption = self.tokenizer(caption)
            caption = {
                'input_ids': tokenized_caption['input_ids'],
                'attention_mask': tokenized_caption['attention_mask'],
            }
        video_path = os.path.join(self.data_root, self.anno[anno_idx]['video'])
        # video = read_frames_decord(video_path, num_frames=self.num_frames).numpy()
        # frames = [self.transforms(Image.fromarray(frame)) for frame in video]
        video = read_frames_decord(video_path, num_frames=self.num_frames).permute(0, 3, 1, 2).float() / 255.0
        frames = self.transforms(video)
        return {
            'caption': caption, 
            'video': frames, 
            'text_embeds': text_embeds, 
            'vision_embeds': video_embeds
        }

The dataloader will load data every 1/100 of total steps. If I train for 4500 steps. the dataloader will fetch enough data. Then the dataloader stops fetching and GPUs start to training. After 45 steps, the GPUs hung and the dataloader starts to fetch data again. The GPU usage is very low due to this problem. And I think it is a bug (or maybe a designed feature?).

Expected behavior

The dataloader continues to fetch data when the gpus are running so that gpus will never stop to wait for data.

bghira commented 5 days ago

i think the GPU blocks the python global interpreter lock (GIL).