philschmid / deep-learning-pytorch-huggingface

MIT License
659 stars 152 forks source link

CPU offload when not using offload deepspeed config file #19

Open siddharthvaria opened 1 year ago

siddharthvaria commented 1 year ago

Hi Philipp,

thanks for your awesome blog on training Flan T5 XXL. I am playing around with it and doing just zero-shot inference using ds_flan_t5_z3_config_bf16.json deepspeed config file. I believe this should not do any offload however I see the following in the deepspeed logs

[2023-06-15 17:30:29,691] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed info: version=0.9.4, git-hash=unknown, git-bra
nch=unknown
[2023-06-15 17:30:58,137] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed Flops Profiler Enabled: False
[2023-06-15 17:30:58,141] [INFO] [logging.py:96:log_dist] [Rank 0] Creating ZeRO Offload
[2023-06-15 17:30:58,223] [INFO] [utils.py:785:see_memory_usage] DeepSpeedZeRoOffload initialize [begin]
[2023-06-15 17:30:58,223] [INFO] [utils.py:786:see_memory_usage] MA 20.74 GB         Max_MA 20.74 GB         CA 20.74 GB
      Max_CA 21 GB
[2023-06-15 17:30:58,224] [INFO] [utils.py:793:see_memory_usage] CPU Virtual Memory:  used = 21.87 GB, percent = 2.9%
Parameter Offload: Total persistent parameters: 503808 in 124 params
[2023-06-15 17:31:08,932] [INFO] [utils.py:785:see_memory_usage] DeepSpeedZeRoOffload initialize [end]
[2023-06-15 17:31:08,933] [INFO] [utils.py:786:see_memory_usage] MA 2.59 GB         Max_MA 20.77 GB         CA 16.29 GB
     Max_CA 21 GB
[2023-06-15 17:31:08,933] [INFO] [utils.py:793:see_memory_usage] CPU Virtual Memory:  used = 21.97 GB, percent = 2.9%

I am also seeing logs mentioning trace cache. Is this related to CPU offload?

Invalidate trace cache @ step 0: expected module 2, but got module 0

Thanks again and looking forward to your reply.

philschmid commented 1 year ago

Can you share the code you use? Do you only want to do inference? What hardware do you have available?

siddharthvaria commented 1 year ago

Hi Phillip, many thanks for getting back. For the code, I modified your script. I have included the code below. I am essentially calling trainer.predict().

At this point, I am just running inference with the pre-trained Flan T5 XXL model. For the hardware I used g5.48xlarge instance. I enable bf16.

There is a related issue when I try doing the same (just inference with pre-trained Flan T5 XXL) on p3dn.24xlarge instance. This time I use fp32. When I use your config file (ds_flan_t5_z3_config.json) I get OOM even with a batch size of 1. Here as well, I do the same as in I call trainer.predict() and pass the whole CNN Dailymail test set as HF Dataset.

I did check Flan T5 XL on p3dn instance with fp32 and it works without OOM.

Let me know if you need other details.

Code:

import os
import pickle
import argparse
import numpy as np
from transformers import (
    AutoModelForSeq2SeqLM,
    DataCollatorForSeq2Seq,
    AutoTokenizer,
    set_seed,
)
from datasets import load_from_disk
# import evaluate
import nltk
import numpy as np

from huggingface_hub import HfFolder
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments

nltk.download("punkt", quiet=True)

# Metric
# metric = evaluate.load("rouge")
# evaluation generation args
gen_kwargs = {
    "early_stopping": True,
    "length_penalty": 2.0,
    "max_new_tokens": 50,
    "min_length": 30,
    "no_repeat_ngram_size": 3,
    "num_beams": 4,
}

def postprocess_text(preds, labels):
    preds = [pred.strip() for pred in preds]
    labels = [label.strip() for label in labels]

    # rougeLSum expects newline after each sentence
    preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in preds]
    labels = ["\n".join(nltk.sent_tokenize(label)) for label in labels]

    return preds, labels

def parse_arge():
    """Parse the arguments."""
    parser = argparse.ArgumentParser()
    # add model id and dataset path argument
    parser.add_argument("--do_train", default=False, action='store_true', help="Do training if passed")
    parser.add_argument("--do_infer", default=False, action='store_true', help="Do inference if passed")
    parser.add_argument("--model_id", type=str, default="google/flan-t5-xl", help="Model id to use for training.")
    parser.add_argument("--ft_model_path", type=str, default=None, help="Path to fine-tuned model for inference")
    parser.add_argument("--dataset_path", type=str, default=None, help="Path to the already processed dataset.")
    parser.add_argument(
        "--repository_id", type=str, default=None, help="Hugging Face Repository id for uploading models"
    )
    # add training hyperparameters for epochs, batch size, learning rate, and seed
    parser.add_argument("--epochs", type=int, default=3, help="Number of epochs to train for.")
    parser.add_argument("--per_device_train_batch_size", type=int, default=8, help="Batch size to use for training.")
    parser.add_argument("--per_device_eval_batch_size", type=int, default=8, help="Batch size to use for testing.")
    parser.add_argument("--generation_max_length", type=int, default=140, help="Maximum length to use for generation")
    parser.add_argument("--generation_num_beams", type=int, default=4, help="Number of beams to use for generation.")
    parser.add_argument("--lr", type=float, default=3e-3, help="Learning rate to use for training.")
    parser.add_argument("--seed", type=int, default=42, help="Seed to use for training.")
    parser.add_argument("--deepspeed", type=str, default=None, help="Path to deepspeed config file.")
    parser.add_argument("--gradient_checkpointing", type=bool, default=True, help="")
    parser.add_argument(
        "--bf16",
        type=bool,
        # default=True if torch.cuda.get_device_capability()[0] == 8 else False,
        default=False,
        help="Whether to use bf16.",
    )
    parser.add_argument(
        "--hf_token",
        type=str,
        default=HfFolder.get_token(),
        help="Token to use for uploading models to Hugging Face Hub.",
    )
    args = parser.parse_known_args()
    return args

"""
def training_function(args):
    # set seed
    set_seed(args.seed)

    # load dataset from disk and tokenizer
    train_dataset = load_from_disk(os.path.join(args.dataset_path, "train"))
    test_dataset = load_from_disk(os.path.join(args.dataset_path, "test"))
    tokenizer = AutoTokenizer.from_pretrained(args.model_id)
    # load model from the hub
    model = AutoModelForSeq2SeqLM.from_pretrained(
        args.model_id,
        use_cache=False if args.gradient_checkpointing else True,  # this is needed for gradient checkpointing
    )

    # we want to ignore tokenizer pad token in the loss
    label_pad_token_id = -100
    # Data collator
    data_collator = DataCollatorForSeq2Seq(
        tokenizer, model=model, label_pad_token_id=label_pad_token_id, pad_to_multiple_of=8
    )

    # Define compute metrics function
    def compute_metrics(eval_preds):
        preds, labels = eval_preds
        if isinstance(preds, tuple):
            preds = preds[0]
        decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
        # Replace -100 in the labels as we can't decode them.
        labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
        decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

        # Some simple post-processing
        decoded_preds, decoded_labels = postprocess_text(decoded_preds, decoded_labels)

        result = metric.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)
        result = {k: round(v * 100, 4) for k, v in result.items()}
        prediction_lens = [np.count_nonzero(pred != tokenizer.pad_token_id) for pred in preds]
        result["gen_len"] = np.mean(prediction_lens)
        return result

    # Define training args
    # output_dir = args.repository_id if args.repository_id else args.model_id.split("/")[-1]
    output_dir = args.model_id.split("/")[-1]
    training_args = Seq2SeqTrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=args.per_device_train_batch_size,
        per_device_eval_batch_size=args.per_device_eval_batch_size,
        predict_with_generate=True,
        generation_max_length=args.generation_max_length,
        generation_num_beams=args.generation_num_beams,
        fp16=False,  # T5 overflows with fp16
        bf16=args.bf16,  # Use BF16 if available
        learning_rate=args.lr,
        num_train_epochs=args.epochs,
        deepspeed=args.deepspeed,
        gradient_checkpointing=args.gradient_checkpointing,
        # logging & evaluation strategies
        logging_dir=f"{output_dir}/logs",
        logging_strategy="steps",
        logging_steps=500,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        save_total_limit=2,
        load_best_model_at_end=True,
        # push to hub parameters
        report_to="tensorboard",
        push_to_hub=True if args.repository_id else False,
        hub_strategy="every_save",
        hub_model_id=args.repository_id if args.repository_id else None,
        hub_token=args.hf_token,
    )

    # Create Trainer instance
    trainer = Seq2SeqTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
    )

    # Start training
    trainer.train()

    # Save our tokenizer and create model card
    tokenizer.save_pretrained(output_dir)
    trainer.create_model_card()
    # Push the results to the hub
    if args.repository_id:
        trainer.push_to_hub()
"""

def inference_function(args):
    # set seed
    set_seed(args.seed)

    # load dataset from disk and tokenizer
    test_dataset = load_from_disk(os.path.join(args.dataset_path, "test"))

    tokenizer = AutoTokenizer.from_pretrained(args.ft_model_path if args.ft_model_path else args.model_id)

    # load model from the hub
    model = AutoModelForSeq2SeqLM.from_pretrained(
        args.ft_model_path if args.ft_model_path else args.model_id,
        use_cache=False if args.gradient_checkpointing else True,  # this is needed for gradient checkpointing
    )

    # we want to ignore tokenizer pad token in the loss
    label_pad_token_id = -100
    # Data collator
    data_collator = DataCollatorForSeq2Seq(
        tokenizer, model=model, label_pad_token_id=label_pad_token_id, pad_to_multiple_of=8
    )

    # Define training args
    # output_dir = args.repository_id if args.repository_id else args.model_id.split("/")[-1]
    output_dir = args.model_id.split("/")[-1]
    inference_args = Seq2SeqTrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=args.per_device_train_batch_size,
        per_device_eval_batch_size=args.per_device_eval_batch_size,
        predict_with_generate=True,
        generation_max_length=args.generation_max_length,
        generation_num_beams=args.generation_num_beams,
        fp16=False,  # T5 overflows with fp16
        bf16=args.bf16,  # Use BF16 if available
        learning_rate=args.lr,
        num_train_epochs=args.epochs,
        deepspeed=args.deepspeed,
        gradient_checkpointing=args.gradient_checkpointing,
        # logging & evaluation strategies
        logging_dir=f"{output_dir}/logs",
        logging_strategy="steps",
        logging_steps=500,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        save_total_limit=2,
        load_best_model_at_end=True,
        # push to hub parameters
        report_to="tensorboard",
        push_to_hub=True if args.repository_id else False,
        hub_strategy="every_save",
        hub_model_id=args.repository_id if args.repository_id else None,
        hub_token=args.hf_token,
    )

    # Create Trainer instance
    trainer = Seq2SeqTrainer(
        model=model,
        args=inference_args,
        data_collator=data_collator,
    )

    # Start inference
    outputs = trainer.predict(test_dataset=test_dataset)
    all_predictions = tokenizer.batch_decode(outputs.predictions, skip_special_tokens=True)
    all_inputs = tokenizer.batch_decode(test_dataset["input_ids"], skip_special_tokens=True)
    all_labels = np.asarray(test_dataset["labels"])
    # Replace -100 in the labels as we can't decode them
    all_labels = np.where(all_labels != -100, all_labels, tokenizer.pad_token_id)
    all_labels = tokenizer.batch_decode(all_labels, skip_special_tokens=True)
    with open(os.path.join(output_dir,f"{output_dir}_predictions.pkl"), "wb") as handle:
        pickle.dump(
            {"documents": all_inputs, "reference_summaries": all_labels, "generated_summaries": all_predictions},
            handle,
            protocol=pickle.HIGHEST_PROTOCOL)

def main():
    args, _ = parse_arge()
    if args.do_train:
        training_function(args)
    if args.do_infer:
        inference_function(args)

if __name__ == "__main__":
    main()
Vie-Jay commented 1 year ago

On a similar note:

What prevents p3/p3dn type instances with V100 GPUs from training Flan-T5-XXL? I've tried using as many as 14 p3.16xl but training fails with OOM almost instantaneously with Flan-T5-XXL.

Trying Flan-T5-XL on 4 p3.16XL instances and debugging with sagemaker (CW metrics for GPU mem utilization are a red-herring and always shows ~100% usage on all nodes)

The following graphs show no more than 30% gpu mem usage on all 4 p3.16xl nodes at any point during training. By that logic, shouldn't Flan-T5-XXL also fit on the same cluster or even a bigger one (14 node cluster with p3.16xl I tried above failed with OOM almost instantaneously)

Screenshot 2023-07-05 at 1 52 09 PM

Screenshot 2023-07-05 at 1 51 56 PM

I'd like to understand the limitation of deepspeed's sharding abilities; Is the most granular piece that's being sharded still too big to fit on a single 16GB V100 gpu

I'd be surprised if that was the case

Or is it because each gpu tries to get weights from other gpus that it needs for computation and OOM due to the accumulated parameter size?