huggingface / accelerate

🚀 A simple way to launch, train, and use PyTorch models on almost any device and distributed configuration, automatic mixed precision (including fp8), and easy-to-configure FSDP and DeepSpeed support
https://huggingface.co/docs/accelerate
Apache License 2.0
7.7k stars 937 forks source link

Fine-tuning only doesn't work with "basic" distributed settings #2458

Closed ccruttjr closed 5 months ago

ccruttjr commented 7 months ago

System Info

- `Accelerate` version: 0.25.0
- Platform: Linux-6.5.0-18-generic-x86_64-with-glibc2.35
- Python version: 3.11.5
- Numpy version: 1.26.3
- PyTorch version (GPU?): 2.1.2 (True)
- PyTorch XPU available: False
- PyTorch NPU available: False
- System RAM: 31.25 GB
- GPU type: NVIDIA GeForce RTX 3090 (2 of them)

Information

Tasks

Reproduction

Accelerate works when I use non-distributed training, any DeepSpeed, and FSDP. It does not, however, work with just selecting multi-gpu and putting all settings to default. They seem to be running out of VRAM, even though there should be PLENTY of space. Here are the yaml config files that worked/didn't work... followed by the code and the error statement. I tried it with and without NCCL_P2P_DISABLE=1 to see if that changed anything but to no avail. Also, jeez is running it solo so much fast haha. I'd love to find what the issue is. I don't seem to be using up all my CPU ram or processing power- and running it solo doesn't even use half of what I need according to nvidia-smi and accelerate estimate-memory with TinyLlama.

non-distributed (works)

compute_environment: LOCAL_MACHINE                                                                                                                                     
debug: false                                                                                                                                                           
distributed_type: 'NO'
downcast_bf16: 'no'
gpu_ids: all
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 1
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

base-distrubuted (doesn't work)

compute_environment: LOCAL_MACHINE                                                                                                                                     
debug: false                                                                                                                                                           
distributed_type: MULTI_GPU
downcast_bf16: 'no'
gpu_ids: all
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 2
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

0 DeepSpeed ZeRO (works)

compute_environment: LOCAL_MACHINE                                                                                                                                     
debug: false                                                                                                                                                           
deepspeed_config:
  gradient_accumulation_steps: 1
  zero3_init_flag: false
  zero_stage: 0
distributed_type: DEEPSPEED
downcast_bf16: 'no'
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 2
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

FSDP (works)

compute_environment: LOCAL_MACHINE                                                                                                                                     
debug: false                                                                                                                                                           
distributed_type: FSDP
downcast_bf16: 'no'
fsdp_config:
  fsdp_auto_wrap_policy: NO_WRAP
  fsdp_backward_prefetch_policy: BACKWARD_PRE
  fsdp_cpu_ram_efficient_loading: true
  fsdp_forward_prefetch: false
  fsdp_offload_params: false
  fsdp_sharding_strategy: 2
  fsdp_state_dict_type: SHARDED_STATE_DICT
  fsdp_sync_module_states: true
  fsdp_use_orig_params: true
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 2
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

Here's the code.

import argparse
from time import time

import torch
from accelerate import Accelerator
from datasets import Dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    get_linear_schedule_with_warmup,
    set_seed,
)

# This allows adjusting training arguments without needing to change the code
def parse_args():
    parser = argparse.ArgumentParser(description="Training script arguments.")
    parser.add_argument("--batch_size", type=int, default=1,
                        help="Batch size for training.")
    parser.add_argument("--mixed_precision", type=str,
                        default="bf16", help="Mixed precision type.")
    parser.add_argument("--lr", type=float, default=5e-5,
                        help="Learning rate.")
    parser.add_argument("--num_epochs", type=int, default=3,
                        help="Number of training epochs.")
    parser.add_argument("--seed", type=int, default=None, help="Random seed.")
    parser.add_argument("--num_warmup_steps", type=int,
                        default=100, help="Number of warm-up steps.")
    parser.add_argument("--num_processes", type=int,
                        default=2, help="Number of gpus to use.")
    parser.add_argument("--model_name", type=str,
                        default="TinyLlama/TinyLlama-1.1B-Chat-v1.0", help="Model to use.")
    parser.add_argument("--data_location", type=str,
                        default="examples/preprocessed_data.json", help="File location for data.")
    parser.add_argument("--save_location", type=str,
                        default="saved_1000", help="File location for data.")
    parser.add_argument("--gradient_accumulation_steps",
                        type=int, default=1, help="Gradient accumulation steps.")
    return parser.parse_args()

def process_dataset(json_file, tokenizer):

    ds = Dataset.from_json(json_file)

    def transform_example(example):
        # Construct system message
        system_message = f"Consult ID: {example['CONSULTID']}. Patient's age: {example['AGE_AT_CONSULT']}. Gender: {example['GENDER']}. Diagnosis Code: {example['DIAGNOSIS_CODE']}."

        # Construct messages in the required format
        messages = [
            {"role": "system", "content": system_message},
            {"role": "user", "content": example["PCP_MESSAGE"]},
            {"role": "assistant", "content": example["SR_MESSAGE"]}
        ]

        return messages

    ds = ds.map(lambda x: {"formatted_chat": tokenizer.apply_chat_template(transform_example(x), tokenize=False, add_generation_prompt=False)})

    return ds

def get_dataloaders(accelerator: Accelerator, batch_size, model_name, data_location, save_location):
    # 1. Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    # 2. Convert JSON to readable dataset
    with accelerator.main_process_first():
        dataset = process_dataset(data_location, tokenizer)
        accelerator.print(dataset["formatted_chat"][0])

    def tokenize_function(examples):
        # Tokenize, pad and truncate the 'formatted_chat' content
        return tokenizer(examples["formatted_chat"], padding="max_length", truncation=True, max_length=128)

    with accelerator.main_process_first():
        tokenized_dataset = dataset.map(tokenize_function, batched=True)

    tokenized_dataset.set_format(
        "torch", columns=["input_ids", "attention_mask"])

    # 4
    split_datasets = tokenized_dataset.train_test_split(test_size=0.2)
    tokenized_train_dataset = split_datasets["train"]
    tokenized_eval_dataset = split_datasets["test"]

    if accelerator.is_main_process:
        print("saving tokenizer")
        # Saving the tokenizer
        tokenizer.save_pretrained(save_location)
        print("saved tokenizer")

    # 5
    train_sampler = DistributedSampler(
        tokenized_train_dataset, num_replicas=accelerator.num_processes, rank=accelerator.process_index, shuffle=True
    )

    eval_sampler = DistributedSampler(
        tokenized_eval_dataset, num_replicas=accelerator.num_processes, rank=accelerator.process_index, shuffle=False
    )

    # 6
    train_dataloader = DataLoader(
        tokenized_train_dataset,
        batch_size=batch_size,
        drop_last=True,
        sampler=train_sampler
    )

    eval_dataloader = DataLoader(
        tokenized_eval_dataset,
        batch_size=batch_size*2,
        drop_last=(accelerator.mixed_precision == "fp8"),
        sampler=eval_sampler
    )
    accelerator.print("returning dataloaders")
    return train_dataloader, eval_dataloader

# 1. Initialize accelerator with mixed percision and define training parameters via arguments given in command line
# 2. Sets seed (if given as a command line argument) for reproducability
# 3. Get dataloaders
# 4. Initialize more training perameters and "prepare"/optimize them via Accelerate
# 5. Train/fine-tune model with new data & set parameters using FSDP
# 6. Evaluate quality of trainer for that epoch
# 7. Have the first GPU save the newly fine-tuned dataset
def training_function(args):
    # 1
    accelerator = Accelerator(mixed_precision=args.mixed_precision,
                              gradient_accumulation_steps=args.gradient_accumulation_steps)
    accelerator.print("set acceleraror")
    lr = args.lr
    num_epochs = args.num_epochs
    batch_size = args.batch_size
    num_warmup_steps = args.num_warmup_steps

    # 2
    if args.seed:
        set_seed(args.seed)

    # 3
    train_dataloader, eval_dataloader = get_dataloaders(
        accelerator, batch_size, args.model_name, args.data_location, args.save_location)
    accelerator.print("set dataloaders")

    # 4
    # Instantiate the model (we build the model here so that the seed also control new weights initialization)
    model = AutoModelForCausalLM.from_pretrained(args.model_name)
    # model = accelerator.prepare(model)
    accelerator.print("set model")
    optimizer = AdamW(params=model.parameters(), lr=lr)
    accelerator.print("set optimizer")
    # Instantiate scheduler
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps=num_warmup_steps,
        num_training_steps=(len(train_dataloader) *
                            num_epochs) // args.gradient_accumulation_steps
    )
    accelerator.print("set lr_scheduler")
    # Prepare everything
    # There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the
    # prepare method.

    accelerator.wait_for_everyone()
    accelerator.print("preparing!")
    model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
        model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
    )

    accelerator.print("preprared stuff")
    # Initialize logging variables
    total_train_loss = 0
    total_eval_loss = 0

    # 5
    # Now we train the model
    for epoch in range(num_epochs):
        accelerator.print("training")
        model.train()
        total_train_loss = 0
        for batch in tqdm(train_dataloader, desc="Training"):
            with accelerator.accumulate(model):
                # Process the batch
                inputs = {k: v.to(accelerator.device)
                          for k, v in batch.items()}
                if "labels" not in inputs:
                    inputs["labels"] = inputs["input_ids"]

                outputs = model(**inputs)
                loss = outputs.loss
                total_train_loss += loss.item()
                accelerator.backward(loss)
                optimizer.step()
                lr_scheduler.step()
                optimizer.zero_grad()

        accelerator.wait_for_everyone()

        # 6
        # Evaluation loop after each training epoch
        model.eval()
        total_eval_loss = 0
        for batch in tqdm(eval_dataloader, "Evaluating"):
            with torch.no_grad():
                inputs = {k: v.to(accelerator.device)
                          for k, v in batch.items()}
                if "labels" not in inputs:
                    inputs["labels"] = inputs["input_ids"]

                outputs = model(**inputs)
                loss = outputs.loss
                total_eval_loss += loss.item()

            accelerator.wait_for_everyone()

        # # Log the average losses
        avg_train_loss = total_train_loss / len(train_dataloader)
        avg_eval_loss = total_eval_loss / len(eval_dataloader)
        print(
            f"Epoch: {epoch}, Average Training Loss: {avg_train_loss}, Average Evaluation Loss: {avg_eval_loss}")

        accelerator.wait_for_everyone()

    # 7
    accelerator.wait_for_everyone()
    accelerator.print("saving")
    accelerator.unwrap_model(model).save_pretrained(
        args.save_location,
        is_main_process=accelerator.is_main_process,
        save_function=accelerator.save,
        state_dict=accelerator.get_state_dict(model),
    )

def main():
    args = parse_args()
    training_function(args)

if __name__ == "__main__":
    start = time()
    main()
    print(f"Total Execution Time: {time() - start} seconds")

I'd run it via

$ accelerate launch file.py --num_processes 1 # or 2 depending on situation

Here's an example of my example/preprocessed_data.json (not real data)

[
    {
        "CONSULTID": "61110688",
        "TAR_STATUS_NAME": "Closed",
        "CODE_ID": "108",
        "CODE_DESC": "Cancelled",
        "STATUS": "02.Cancelled",
        "YEAR_CREATED": "2023",
        "SUBMIT_TO_RESPOND": "3.17",
        "SUBMIT_TO_CLOSE": "30.06",
        "SPECIALTY_NAME": "GASTROENTEROLOGY - ADULT",
        "GENDER": "M",
        "AGE_AT_CONSULT": "69",
        "CREATED": "2023-01-03T12:15:16",
        "DOB": "1953-05-01",
        "PCP_NAME": "Armen Babaian",
        "SR_NAME": "James Tabibian",
        "ORG_NAME": "AAA - OVM Medi-Cal Ineligible Over 50",
        "ORG_TYPE": "OTHER",
        "DIAGNOSIS_CODE": "Z12.11",
        "CATEGORY_NAME": "Medicine/Non-Surg",
        "SUBCATEGORY_NAME": "GI",
        "PCP_MESSAGE": "Hi James, I have a patient with chronic constipation who has failed medical management. What are your recommendations?",
        "TQ_HEADER": "Clinical question",
        "SR_MESSAGE": "Hi Armen, thanks for your message. I would recommend you referring your patient to a gastroenterologist for further evaluation and treatment. They may need additional tests, such as a colonoscopy or endoscopy, to determine the cause of their constipation. Additionally, I recommend you discuss with your patient about dietary and lifestyle changes that may help relieve their symptoms."
    },
    {
        "CONSULTID": "61110688",
        "TAR_STATUS_NAME": "Closed",
        "CODE_ID": "108",
        "CODE_DESC": "Cancelled",
        "STATUS": "02.Cancelled",
        "YEAR_CREATED": "2023",
        "SUBMIT_TO_RESPOND": "3.17",
        "SUBMIT_TO_CLOSE": "30.06",
        "SPECIALTY_NAME": "GASTROENTEROLOGY - ADULT",
        "GENDER": "M",
        "AGE_AT_CONSULT": "69",
        "CREATED": "2023-01-03T12:15:16",
        "DOB": "1953-05-01",
        "PCP_NAME": "Armen Babaian",
        "SR_NAME": "James Tabibian",
        "ORG_NAME": "AAA - OVM Medi-Cal Ineligible Over 50",
        "ORG_TYPE": "OTHER",
        "DIAGNOSIS_CODE": "Z12.11",
        "CATEGORY_NAME": "Medicine/Non-Surg",
        "SUBCATEGORY_NAME": "GI",
        "PCP_MESSAGE": "Hi James, I have a patient with chronic constipation who has failed medical management. What are your recommendations?",
        "TQ_HEADER": "Clinical question",
        "SR_MESSAGE": "Hi Armen, thanks for your message. I would recommend you referring your patient to a gastroenterologist for further evaluation and treatment. They may need additional tests, such as a colonoscopy or endoscopy, to determine the cause of their constipation. Additionally, I recommend you discuss with your patient about dietary and lifestyle changes that may help relieve their symptoms."
    },
    ...
]

Expected behavior

.

BenjaminBossan commented 7 months ago

Just to be sure I understand correctly, you want to use DDP and you run out of memory? Could you please paste the full error message you get?

github-actions[bot] commented 6 months ago

This issue has been automatically marked as stale because it has not had recent activity. If you think this still needs to be addressed please comment on this thread.

Please note that issues that do not follow the contributing guidelines are likely to be ignored.