huggingface / trl

Train transformer language models with reinforcement learning.
http://hf.co/docs/trl
Apache License 2.0
10.17k stars 1.29k forks source link

Distributed training stuck #151

Closed koking0 closed 1 year ago

koking0 commented 1 year ago

I'm using the gpt2-sentiment.py script in examples for distributed training, where the data and reward models are replaced with our own.

If there is no problem with using a GPU for training, it can proceed normally.

When I use distributed training, I get stuck at stats = ppo_trainer.step(query_tensors, response_tensors, rewards) of the first epoch.

I added some print information to the ppo_trainer.py file, and found that I was stuck in the gather_stats step. The information to be collected by dist.all_reduce(v, dist.ReduceOp.SUM) is the following list, but when I run Only part of the parameters were collected during the process.

stats.keys() = dict_keys(['objective/kl', 'objective/kl_dist', 'objective/logprobs', 'objective/ref_logprobs', 'objective/kl_coef', 'objective/entropy', 'ppo/mean_non_score_reward' , 'ppo/loss/policy', 'ppo/loss/value', 'ppo/loss/total', 'ppo/policy/entropy', 'ppo/policy/approxkl', 'ppo/policy/policykl', ' ppo/policy/clipfrac', 'ppo/policy/advantages', 'ppo/policy/advantages_mean', 'ppo/policy/ratio', 'ppo/returns/mean', 'ppo/returns/var', 'ppo/ val/vpred', 'ppo/val/error', 'ppo/val/clipfrac', 'ppo/val/mean', 'ppo/val/var', 'ppo/time/ppo/optimizer_step', 'ppo/ val/var_explained'])

Below are the parameters I collected.

'objective/kl', ✅
'objective/entropy', ✅
'ppo/mean_non_score_reward', ✅
'ppo/loss/policy', ✅
'ppo/loss/value', ✅
'ppo/loss/total', ✅
'ppo/policy/entropy', ✅
'ppo/policy/approxkl', ✅
'ppo/policy/policykl', ✅
'ppo/policy/clipfrac', ✅

It feels that the operation is stuck in this place because no other parameters are collected from other machines.

my code:

import os
import time

from trl.core import LengthSampler

import argparse

import torch
from tqdm import tqdm

tqdm.pandas()

from fengshen import UbertPipelines
from transformers import AutoTokenizer
from datasets import load_dataset

from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead, set_seed

config = PPOConfig(
    model_name="IDEA-CCNL/Wenzhong-GPT2-110M",
    learning_rate=1.41e-5,
    ppo_epochs=1,
    batch_size=32,
)
tokenizer = AutoTokenizer.from_pretrained(config.model_name)
tokenizer.pad_token = tokenizer.eos_token

def build_dataset(config, input_min_text_length=100, input_max_text_length=150):
    tokenizer = AutoTokenizer.from_pretrained(config.model_name)
    tokenizer.pad_token = tokenizer.eos_token

    data_files = {
        "train": "data/train.txt",
        "test": "data/test.txt",
        "validation": "data/valid.txt"
    }
    ds = load_dataset("text", data_files=data_files, split="train")
    ds = ds.rename_columns({"text": "review"})
    ds = ds.filter(lambda x: len(x["review"]) > 100, batched=False)

    input_size = LengthSampler(input_min_text_length, input_max_text_length)

    def tokenize(sample):
        sample["input_ids"] = tokenizer.encode(sample["review"])[: input_size()]
        sample["query"] = tokenizer.decode(sample["input_ids"])
        return sample

    ds = ds.map(tokenize, batched=False)
    ds.set_format(type="torch")
    return ds

# We retrieve the dataloader by calling the `build_dataset` function.
dataset = build_dataset(config)

def collator(data):
    return dict((key, [d[key] for d in data]) for key in data[0])

# set seed before initializing value head for deterministic eval
set_seed(config.seed)

# Now let's build the model, the reference model, and the tokenizer.
model = AutoModelForCausalLMWithValueHead.from_pretrained(config.model_name)
ref_model = AutoModelForCausalLMWithValueHead.from_pretrained(config.model_name)

# We then build the PPOTrainer, passing the model, the reference model, the tokenizer
ppo_trainer = PPOTrainer(config, model, ref_model, tokenizer, dataset=dataset, data_collator=collator)

device = ppo_trainer.accelerator.device
if ppo_trainer.accelerator.num_processes == 1:
    device = 0 if torch.cuda.is_available() else "cpu"  # to avoid a `pipeline` bug

total_parser = argparse.ArgumentParser("TASK NAME")
total_parser = UbertPipelines.pipelines_args(total_parser)
args = total_parser.parse_args()

# The reward model here is a binary classification.
args.pretrained_model_path = "./models/ubert/"
args.default_root_dir = "./checkpoint/"
pipeline = UbertPipelines(args)

generation_kwargs = {
    "min_length": -1,
    "top_k": 0.0,
    "top_p": 1.0,
    "do_sample": True,
    "pad_token_id": tokenizer.eos_token_id,
}

output_min_length = 50
output_max_length = 100
output_length_sampler = LengthSampler(output_min_length, output_max_length)

# 打印当前开始训练时间
print(str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) + ": Starting training.")

def cur_time():
    return str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

for epoch, batch in tqdm(enumerate(ppo_trainer.dataloader)):
    query_tensors = batch["input_ids"]

    # Get response from gpt2
    response_tensors = []
    for query in query_tensors:
        gen_len = output_length_sampler()
        generation_kwargs["max_new_tokens"] = gen_len
        response = ppo_trainer.generate(query, **generation_kwargs)
        response_tensors.append(response.squeeze())
    batch["response"] = [tokenizer.decode(r.squeeze()) for r in response_tensors]

    # Compute sentiment score
    texts = [{
        "id": 1,
        "task_type": "分类任务",
        "subtask_type": "文本分类",
        "text": q + r,
        "choices": [{
            "entity_type": "销售",
            "label": 1,
            "entity_list": []
        }, {
            "entity_type": "模型",
            "label": 0,
            "entity_list": []
        }]
    } for q, r in zip(batch["query"], batch["response"])]
    pipe_outputs = pipeline.predict(texts)
    rewards = []
    for data in pipe_outputs:
        for choice in data["choices"]:
            if choice.get("score", None):
                rewards.append(torch.tensor(choice["score"]))
    print(cur_time() + f": len(rewards) = {len(rewards)}")

    # Run PPO step
    stats = ppo_trainer.step(query_tensors, response_tensors, rewards)
    print(cur_time() + f": epoch = {epoch} PPO step over")

ppo_trainer._save_pretrained(save_directory="models/gpt-ppo")

There is no error message.

Can anyone tell me what could be the cause? How can I solve it?

younesbelkada commented 1 year ago

Thanks a lot for your issue @koking0 ! Can you share with us more details about the environment you are using? Also how do you run your script? I advice you to run the script with: Step 1:

accelerate config

Then chose multi-gpu Step 2:

accelerate launch your_script.py
koking0 commented 1 year ago

thank you for your reply. I did it with the steps you provided.

This is the config file I generated via accelerate config:

$ cat /home/user/.cache/huggingface/accelerate/default_config.yaml                                                                                     
compute_environment: LOCAL_MACHINE                                                                                                                                                                                                 
deepspeed_config: {}
distributed_type: MULTI_GPU
downcast_bf16: 'no'
dynamo_backend: 'NO'
fsdp_config: {}
gpu_ids: 0,2,4,5,6,7
machine_rank: 0
main_training_function: main
megatron_lm_config: {}
mixed_precision: 'no'
num_machines: 1
num_processes: 6
rdzv_backend: static
same_network: true
use_cpu: false

I think I have further positioned some problems, the all_reduce operation is initially stuck in ppo/policy/advantages, there may be other elements that will be stuck later, but the first one will be it.

I took a look at the dimensions and found that the dimensions are not consistent on each card, that could be a major reason, but why?

log:

0 start reduce process: ppo/policy/advantages, size: torch.Size([7744])
2 start reduce process: ppo/policy/advantages, size: torch.Size([7360])
3 start reduce process: ppo/policy/advantages, size: torch.Size([7712])
1 start reduce process: ppo/policy/advantages, size: torch.Size([7328])
5 start reduce process: ppo/policy/advantages, size: torch.Size([7552])
0 start reduce process: ppo/policy/advantages_mean, size: torch.Size([1])
4 start reduce process: ppo/policy/advantages, size: torch.Size([7776])
2 start reduce process: ppo/policy/advantages_mean, size: torch.Size([1])
3 start reduce process: ppo/policy/advantages_mean, size: torch.Size([1])
1 start reduce process: ppo/policy/advantages_mean, size: torch.Size([1])
5 start reduce process: ppo/policy/advantages_mean, size: torch.Size([1])
4 start reduce process: ppo/policy/advantages_mean, size: torch.Size([1])
younesbelkada commented 1 year ago

Hi @koking0 Thanks for narrowing down the issue, I think that I have found the bug The advantages are computed with respect to each token of the response. In your case, your response are stored as such:

response_tensors.append(response.squeeze())

So each process will have a response that has a different length, thus resulting in a script that hangs. In gpt2-sentiment script we do:

response_tensors.append(response.squeeze()[-gen_len:])

And my understanding is that this leads for each process to have the same response length (after double checking on my side with 2xNVIDIA T4). Can you please double check that?

koking0 commented 1 year ago

Yes!

Thank you very very very much for your help in solving my problem.

I wish trl more and more perfect.

shizhediao commented 1 year ago

Update: Seems that I got stuck at stats_to_np.

Hi, I encountered the same problem, where I got stuck at gather_stats. But I am using the official script as shown below. Could you help me take a look? Thanks!

########################################################################
# This is a fully working simple example to use trl with accelerate.
#
# This example fine-tunes a GPT2 model on the IMDB dataset using PPO
# (proximal policy optimization).
# in any of the following settings (with the same script):
#   - single CPU or single GPU
#   - multi GPUS (using PyTorch distributed mode)
#   - multi GPUS (using DeepSpeed ZeRO-Offload stages 1 & 2)
#   - fp16 (mixed-precision) or fp32 (normal precision)
#
# To run it in each of these various modes, first initialize the accelerate
# configuration with `accelerate config`
#
########################################################################

# 0. imports
import torch
import time
import argparse
from tqdm import tqdm
tqdm.pandas()

from transformers import pipeline, AutoTokenizer, set_seed
from datasets import load_dataset
from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead, create_reference_model
from trl.core import respond_to_batch, LengthSampler

def arg_parser():
    parser = argparse.ArgumentParser(description="LLM-RLHF")
    parser.add_argument("--random_seed", type=int, default=1, help="random seed")
    parser.add_argument(
        "--dataset", type=str, default="gsm8k", choices=["gsm8k", "svamp", "aqua", "csqa", "asdiv", "last_letters", "addsub", "singleeq", "strategyqa"], help="dataset to inference"
    )
    parser.add_argument(
        "--model_name", type=str, default="lvwerra/gpt2-imdb", help="base model used for training"
    )     # lvwerra/gpt2-imdb is a GPT2 model which was additionally fine-tuned on the IMDB dataset for 1 epoch with the huggingface script (no special settings). The other parameters are mostly taken from the original paper "Fine-Tuning Language Models from Human Preferences". This model as well as the BERT model is available in the Huggingface model zoo here. The following code should automatically download the models.
    parser.add_argument(
        "--reward_model_name", type=str, default="lvwerra/distilbert-imdb", help="base model used for training"
    )
    parser.add_argument("--learning_rate", type=float, default=1.41e-5, help="learning rate")
    parser.add_argument("--task_name", type=str, default="sentiment-analysis", help="task name for pipeline")
    parser.add_argument("--batch_size", type=int, default=1, help="training batch size")
    parser.add_argument("--inference_batch_size", type=int, default=1, help="inference batch size")
    parser.add_argument("--forward_batch_size", type=int, default=1, help="forward batch size")
    parser.add_argument("--output_dir", type=str, default="./output_dir", help="output directory")
    parser.add_argument("--output_min_length", type=int, default=4, help="output_min_length")
    parser.add_argument("--output_max_length", type=int, default=16, help="output_max_length")
    parser.add_argument("--min_length", type=int, default=0, help="minimum length for generation")
    parser.add_argument("--do_sample", type=bool, default=True, help="sampling or not")
    parser.add_argument("--model_args", type=str, help="arguments for generator args")
    parser.add_argument("--local_rank", type=int, help="local rank")

    args = parser.parse_args()

    return args

def build_dataset(config, dataset_name="imdb", input_min_text_length=2, input_max_text_length=8):
    """
    Build dataset for training. This builds the dataset from `load_dataset`, one should 
    customize this function to train the model on its own dataset.

    Args:
        dataset_name (`str`): 
            The name of the dataset to be loaded.

    Returns:
        dataloader (`torch.utils.data.DataLoader`):
            The dataloader for the dataset.
    """
    tokenizer = AutoTokenizer.from_pretrained(config.model_name)
    tokenizer.pad_token = tokenizer.eos_token
    # load imdb with datasets
    ds = load_dataset(dataset_name, split='train')
    ds = ds.rename_columns({'text': 'review'})
    ds = ds.filter(lambda x: len(x["review"])>200, batched=False)

    input_size = LengthSampler(input_min_text_length, input_max_text_length)

    def tokenize(sample):
        sample["input_ids"] = tokenizer.encode(sample["review"])[:input_size()]
        sample["query"] = tokenizer.decode(sample["input_ids"])
        return sample

    ds = ds.map(tokenize, batched=False)
    ds.set_format(type='torch')
    return ds

def main():
    # 1. Set up configs
    args = arg_parser()
    ppo_config = PPOConfig(
        batch_size=args.batch_size,
        forward_batch_size=args.forward_batch_size,
        model_name=args.model_name,
        learning_rate=args.learning_rate,
        log_with="wandb",
    )
    sent_kwargs = {
        "return_all_scores": True,
        "function_to_apply": "none",
        "batch_size": ppo_config.forward_batch_size
    }
    # set seed before initializing value head for deterministic eval
    set_seed(args.random_seed)

    # 2. Load dataset
    dataset = build_dataset(ppo_config)
    def collator(data):
        return dict((key, [d[key] for d in data]) for key in data[0])

    # 3. load pretrained models
    model = AutoModelForCausalLMWithValueHead.from_pretrained(ppo_config.model_name)
    ref_model = AutoModelForCausalLMWithValueHead.from_pretrained(ppo_config.model_name)
    tokenizer = AutoTokenizer.from_pretrained(ppo_config.model_name)
    tokenizer.pad_token = tokenizer.eos_token

    # 4. initialize trainer
    ppo_trainer = PPOTrainer(ppo_config, model, ref_model, tokenizer, dataset=dataset, data_collator=collator)

    # 5. load reward model
    device = ppo_trainer.accelerator.device
    if ppo_trainer.accelerator.num_processes == 1:
        device = 0 if torch.cuda.is_available() else "cpu" # to avoid a `pipeline` bug
    sentiment_pipe = pipeline(args.task_name, model=args.reward_model_name, device=device)

    # 6. Training
    output_length_sampler = LengthSampler(args.output_min_length, args.output_max_length)
    # set up generation settings
    generation_kwargs = {
        "min_length":-1,
        "top_k": 0.0,
        "top_p": 1.0,
        "do_sample": True,
        "pad_token_id": tokenizer.eos_token_id
    }

    for batch_index, batch in enumerate(tqdm(ppo_trainer.dataloader, desc='Training')):
        print("batch_index", batch_index)
        query_tensors = batch['input_ids']

        #### Get response from gpt2
        response_tensors = []

        for query in query_tensors:
            gen_len = output_length_sampler()
            generation_kwargs["max_new_tokens"] = gen_len
            response = ppo_trainer.generate(query, **generation_kwargs)
            response_tensors.append(response.squeeze()[-gen_len:])
        batch['response'] = [tokenizer.decode(r.squeeze()) for r in response_tensors]

        #### Compute sentiment score
        texts = [q + r for q,r in zip(batch['query'], batch['response'])]

        pipe_outputs = sentiment_pipe(texts, **sent_kwargs)

        rewards = [torch.tensor(output[1]["score"]) for output in pipe_outputs]

        print("query_tensors", query_tensors)
        #### Run PPO step 
        stats = ppo_trainer.step(query_tensors, response_tensors, rewards)

        ppo_trainer.log_stats(stats, batch, rewards)

    # 7. Model Inspectation
    #### get a batch from the dataset
    inference_batch_size = args.inference_batch_size
    game_data = dict()
    dataset.set_format("pandas")
    df_batch = dataset[:].sample(inference_batch_size)
    game_data['query'] = df_batch['query'].tolist()
    query_tensors = df_batch['tokens'].tolist()

    response_tensors_ref, response_tensors = [], []

    #### get response from gpt2 and gpt2_ref
    for i in range(inference_batch_size):
        gen_len = output_length_sampler()
        output = ref_model.generate(torch.tensor(query_tensors[i]).unsqueeze(dim=0).to(device),
                                        max_new_tokens=gen_len, **gen_kwargs).squeeze()[-gen_len:]
        response_tensors_ref.append(output)
        output = model.generate(torch.tensor(query_tensors[i]).unsqueeze(dim=0).to(device),
                                    max_new_tokens=gen_len, **gen_kwargs).squeeze()[-gen_len:]
        response_tensors.append(output)

    #### decode responses
    game_data['response (before)'] = [tokenizer.decode(response_tensors_ref[i]) for i in range(inference_batch_size)]
    game_data['response (after)'] = [tokenizer.decode(response_tensors[i]) for i in range(inference_batch_size)]

    #### sentiment analysis of query/response pairs before/after
    texts = [q + r for q,r in zip(game_data['query'], game_data['response (before)'])]
    game_data['rewards (before)'] = [output[1]["score"] for output in sentiment_pipe(texts, **sent_kwargs)]

    texts = [q + r for q,r in zip(game_data['query'], game_data['response (after)'])]
    game_data['rewards (after)'] = [output[1]["score"] for output in sentiment_pipe(texts, **sent_kwargs)]

    # store results in a dataframe
    df_results = pd.DataFrame(game_data)
    print(df_results)

    # 8. Save model and tokenizer locally
    model.save_pretrained(f'{args.output_dir}/rlhf-model-ppo', push_to_hub=True)
    tokenizer.save_pretrained(f'{args.output_dir}/rlhf-model-ppo', push_to_hub=True)

if __name__ == "__main__":
    main()
younesbelkada commented 1 year ago

I also faced a similar issue with the latest trl that I fixed with https://github.com/lvwerra/trl/pull/222