microsoft / DeepSpeedExamples

Example models using DeepSpeed
Apache License 2.0
6.02k stars 1.02k forks source link

RLHF problems when using Qwen model #861

Open 128Ghe980 opened 7 months ago

128Ghe980 commented 7 months ago

I'm trying to use DeepSpeed-Chat stage2 scripts to do rlhf with Qwen1.8b-chat model,I change some parts in dschat and main.py to load my model, the most different part is:

if 'Qwen' in model_name_or_path:
        critic_model = create_hf_model(AutoModelForCausalLM, model_name_or_path, tokenizer,
                                   ds_config, rlhf_training, dropout)
else:
        critic_model = create_hf_model(AutoModel, model_name_or_path, tokenizer,
                                       ds_config, rlhf_training, dropout)

I load my model by "AutoModelForCausalLM" not "AutoModel" in model_utils.py but it stiil has some problems

the error: Running training Evaluating reward, Epoch 0/1 Beginning of Epoch 1/1, Total Micro Batches 8479 /usr/local/lib/python3.10/dist-packages/torch/utils/checkpoint.py:391: UserWarning: torch.utils.checkpoint: please pass in use_reentrant=True or use_reentrant=False explicitly. The default value of use_reentrant will be updated to be False in the future. To maintain current behavior, pass use_reentrant=True. It is recommended that you use use_reentrant=False. Refer to docs for more details on the differences between the two variants. warnings.warn( Traceback (most recent call last): File "/home/tione/notebook/code/RLHF/main.py", line 441, in main() File "/home/tione/notebook/code/RLHF/main.py", line 390, in main outputs = rm_model(batch, use_cache=False) File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1502, in _wrapped_call_impl return self._call_impl(*args, *kwargs) File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1511, in _call_impl return forward_call(args, kwargs) File "/usr/local/lib/python3.10/dist-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn ret_val = func(*args, kwargs) File "/usr/local/lib/python3.10/dist-packages/deepspeed/runtime/engine.py", line 1769, in forward loss = self.module(*inputs, *kwargs) File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1502, in _wrapped_call_impl return self._call_impl(args, kwargs) File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1548, in _call_impl result = forward_call(*args, kwargs) File "/home/tione/notebook/code/RLHF/dschat/utils/model/reward_model.py", line 57, in forward transformer_outputs = self.rwtransformer( File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1502, in _wrapped_call_impl return self._call_impl(*args, *kwargs) File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1548, in _call_impl result = forward_call(args, kwargs) File "/root/.cache/huggingface/modules/transformers_modules/modeling_qwen.py", line 1060, in forward lm_logits = self.lm_head(hidden_states) File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1502, in _wrapped_call_impl return self._call_impl(*args, kwargs) File "/usr/local/lib/python3.10/dist-packages/torch/nn/modules/module.py", line 1537, in _call_impl result = hook(self, args) File "/usr/local/lib/python3.10/dist-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn ret_val = func(*args, *kwargs) File "/usr/local/lib/python3.10/dist-packages/deepspeed/runtime/zero/parameter_offload.py", line 382, in _pre_forward_module_hook self.pre_sub_module_forward_function(module) File "/usr/local/lib/python3.10/dist-packages/torch/utils/_contextlib.py", line 115, in decorate_context return func(args, kwargs) File "/usr/local/lib/python3.10/dist-packages/deepspeed/runtime/zero/parameter_offload.py", line 494, in pre_sub_module_forward_function param_coordinator.fetch_sub_module(sub_module, forward=True) File "/usr/local/lib/python3.10/dist-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn ret_val = func(*args, *kwargs) File "/usr/local/lib/python3.10/dist-packages/torch/utils/_contextlib.py", line 115, in decorate_context return func(args, **kwargs) File "/usr/local/lib/python3.10/dist-packages/deepspeed/runtime/zero/partitioned_param_coordinator.py", line 303, in fetch_sub_module assert param.ds_status == ZeroParamStatus.AVAILABLE, param.ds_summary() AssertionError: {'id': 196, 'status': 'NOT_AVAILABLE', 'numel': 0, 'ds_numel': 0, 'shape': (0,), 'ds_shape': (0, 2048), 'requires_grad': True, 'grad_shape': None, 'persist': True, 'active_sub_modules': {297}, 'ds_tensor.shape': torch.Size([0])} [2024-02-22 16:15:14,956] [INFO] [launch.py:315:sigkill_handler] Killing subprocess 12019 [2024-02-22 16:15:14,956] [ERROR] [launch.py:321:sigkill_handler] ['/usr/bin/python', '-u', 'main.py', '--local_rank=0', '--data_path', 'local/jsonfile', '--data_split', '2,4,4', '--model_name_or_path', '/home/tione/notebook/model/Qwen-1_8B-Chat/', '--per_device_train_batch_size', '2', '--per_device_eval_batch_size', '8', '--max_seq_len', '512', '--learning_rate', '9.65e-6', '--weight_decay', '0.1', '--num_padding_at_beginning', '0', '--num_train_epochs', '1', '--gradient_accumulation_steps', '4', '--lr_scheduler_type', 'cosine', '--num_warmup_steps', '0', '--seed', '1234', '--gradient_checkpointing', '--zero_stage', '3', '--deepspeed', '--output_dir', './output'] exits with return code = 1

it occurs that something wrong happens when it comes to: outputs = rm_model(**batch, use_cache=False)

can any one help?

the main.py:

#!/usr/bin/env python
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team
import argparse
import math

import torch
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from torch.utils.data.distributed import DistributedSampler

from transformers import (
    SchedulerType,
    get_scheduler,
)

import deepspeed
from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam
from deepspeed.accelerator import get_accelerator

from dschat.utils.model.model_utils import create_critic_model
from dschat.utils.data.data_utils import create_prompt_dataset, DataCollatorReward
from dschat.utils.utils import print_rank_0, to_device, save_hf_format, set_random_seed, get_all_reduce_mean, get_optimizer_grouped_parameters, save_zero_three_model, load_hf_tokenizer
from dschat.utils.ds_utils import get_train_ds_config
from dschat.utils.module.lora import convert_linear_layer_to_lora, convert_lora_to_linear_layer, only_optimize_lora_parameters, make_model_gradient_checkpointing_compatible

def parse_args():
    parser = argparse.ArgumentParser(
        description=
        "Finetune a transformers model on a causal language modeling task")
    parser.add_argument('--data_path',
                        nargs='*',
                        default=['Dahoas/rm-static'],
                        help='Path to the training dataset. Accepted format:'
                        '1) a single data path, 2) multiple datasets in the'
                        'form: dataset1-path dataset2-path ...')
    parser.add_argument('--data_split',
                        type=str,
                        default='2,4,4',
                        help='Comma-separated list of proportions for training'
                        'phase 1, 2, and 3 data. For example the split `2,4,4`'
                        'will use 60%% of data for phase 1, 20%% for phase 2'
                        'and 20%% for phase 3.')
    parser.add_argument(
        '--data_output_path',
        type=str,
        default='/tmp/data_files/',
        help='Where to store the data-related files such as shuffle index.')
    parser.add_argument(
        "--model_name_or_path",
        type=str,
        help=
        "Path to pretrained model or model identifier from huggingface.co/models.",
        required=True,
    )
    parser.add_argument(
        "--num_padding_at_beginning",
        type=int,
        default=1,
        help=
        "OPT model has a fixed number (1) of padding tokens at the beginning of the input. "
        "We did not see this in other models but keep it as an option for now.",
    )
    parser.add_argument(
        "--per_device_train_batch_size",
        type=int,
        default=16,
        help="Batch size (per device) for the training dataloader.",
    )
    parser.add_argument(
        "--per_device_eval_batch_size",
        type=int,
        default=16,
        help="Batch size (per device) for the evaluation dataloader.",
    )
    parser.add_argument(
        "--max_seq_len",
        type=int,
        default=512,
        help="The maximum sequence length.",
    )
    parser.add_argument(
        "--learning_rate",
        type=float,
        default=5e-5,
        help=
        "Initial learning rate (after the potential warmup period) to use.",
    )
    parser.add_argument("--weight_decay",
                        type=float,
                        default=0.,
                        help="Weight decay to use.")
    parser.add_argument("--num_train_epochs",
                        type=int,
                        default=1,
                        help="Total number of training epochs to perform.")
    parser.add_argument(
        "--gradient_accumulation_steps",
        type=int,
        default=1,
        help=
        "Number of updates steps to accumulate before performing a backward/update pass.",
    )
    parser.add_argument(
        "--lr_scheduler_type",
        type=SchedulerType,
        default="cosine",
        help="The scheduler type to use.",
        choices=[
            "linear", "cosine", "cosine_with_restarts", "polynomial",
            "constant", "constant_with_warmup"
        ],
    )
    parser.add_argument(
        "--num_warmup_steps",
        type=int,
        default=0,
        help="Number of steps for the warmup in the lr scheduler.")
    parser.add_argument("--output_dir",
                        type=str,
                        default=None,
                        help="Where to store the model.")
    parser.add_argument("--seed",
                        type=int,
                        default=1234,
                        help="A seed for reproducible training.")
    parser.add_argument("--local_rank",
                        type=int,
                        default=-1,
                        help="local_rank for distributed training on gpus")
    parser.add_argument(
        '--gradient_checkpointing',
        action='store_true',
        help='Enable HF gradient checkpointing for Actor model.')
    parser.add_argument(
        "--dropout",
        type=float,
        default=None,
        help="If dropout configured, use it. "
        "Otherwise, keep the default dropout configuration of the model.")
    # deepspeed features
    parser.add_argument('--offload',
                        action='store_true',
                        help='Enable ZeRO Offload techniques.')
    parser.add_argument('--dtype',
                        type=str,
                        default='fp16',
                        choices=['fp16', 'bf16'],
                        help='Training data type')
    parser.add_argument(
        '--zero_stage',
        type=int,
        default=0,
        help='ZeRO optimization stage for Actor model (and clones).')
    ## LoRA for efficient training setting
    parser.add_argument("--lora_dim",
                        type=int,
                        default=0,
                        help="If > 0, use LoRA for efficient training.")
    parser.add_argument("--lora_module_name",
                        type=str,
                        default="decoder.layers.",
                        help="The scope of LoRA.")
    parser.add_argument('--only_optimize_lora',
                        action='store_true',
                        help='Only optimize the LoRA parameters.')
    parser.add_argument(
        "--lora_learning_rate",
        type=float,
        default=5e-4,
        help=
        "Initial LoRA learning rate (after the potential warmup period) to use."
    )

    # Evaluation
    parser.add_argument("--eval_interval",
                        type=int,
                        default=0,
                        help="If > 0, perform evaluation at this interval")
    parser.add_argument("--eval_iters",
                        type=int,
                        default=100,
                        help="Maximum evaluation iterations")
    ## low precision
    parser.add_argument(
        '--compute_fp32_loss',
        action='store_true',
        help='Relevant for low precision dtypes (fp16, bf16, etc.). '
        'If specified, loss is calculated in fp32.')

    ## Tensorboard logging
    parser.add_argument('--enable_tensorboard',
                        action='store_true',
                        help='Enable tensorboard logging')
    parser.add_argument('--tensorboard_path',
                        type=str,
                        default="step2_tensorboard")
    ## Tokenizer
    parser.add_argument(
        "--add_eot_token",
        action='store_true',
        help="Add <|endoftext|> as additional special token to tokenizer")
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()

    return args

def main():
    args = parse_args()

    if args.local_rank == -1:
        device = torch.device(get_accelerator().device_name())
    else:
        get_accelerator().set_device(args.local_rank)
        device = torch.device(get_accelerator().device_name(), args.local_rank)
        # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
        # torch.distributed.init_process_group(backend='nccl')
        deepspeed.init_distributed()

    args.global_rank = torch.distributed.get_rank()

    ds_config = get_train_ds_config(offload=args.offload,
                                    dtype=args.dtype,
                                    stage=args.zero_stage,
                                    enable_tensorboard=args.enable_tensorboard,
                                    tb_path=args.tensorboard_path,
                                    tb_name="step2_model")
    ds_config[
        'train_micro_batch_size_per_gpu'] = args.per_device_train_batch_size
    ds_config[
        'train_batch_size'] = args.per_device_train_batch_size * torch.distributed.get_world_size(
        ) * args.gradient_accumulation_steps

    # If passed along, set the training seed now.
    set_random_seed(args.seed)
    torch.distributed.barrier()

    # load_hf_tokenizer will get the correct tokenizer and set padding tokens based on the model family
    args.end_of_conversation_token = "<|endoftext|>"
    additional_special_tokens = args.end_of_conversation_token if args.add_eot_token else None
    tokenizer = load_hf_tokenizer(args.model_name_or_path,
                                  fast_tokenizer=True,
                                  add_special_tokens=additional_special_tokens)
    rm_model = create_critic_model(args.model_name_or_path,
                                   tokenizer,
                                   ds_config,
                                   args.num_padding_at_beginning,
                                   dropout=args.dropout,
                                   zero_stage=args.zero_stage,
                                   compute_fp32_loss=args.compute_fp32_loss)

    # Model bigscience/bloom-560m has large variance at ln_f.weight parameter
    # This makes bf16 finetuning hard.
    # In general, since we are replacing the model head, it makes sense to reset
    # the LN that precedes it.
    force_optimize_params = []
    if "bigscience/bloom-" in args.model_name_or_path:
        zero_init_enabled = (args.zero_stage == 3)
        params = [
            rm_model.rwtranrsformer.ln_f.weight,
            rm_model.rwtranrsformer.ln_f.bias
        ]
        with deepspeed.zero.GatheredParameters(params,
                                               modifier_rank=0,
                                               enabled=zero_init_enabled):
            if deepspeed.comm.get_rank() == 0 or not zero_init_enabled:
                torch.nn.init.ones_(rm_model.rwtransformer.ln_f.weight)
                torch.nn.init.zeros_(rm_model.rwtransformer.ln_f.bias)
        force_optimize_params.extend(
            ['rwtransformer.ln_f.weight', 'rwtransformer.ln_f.bias'])

    if args.lora_dim > 0:
        rm_model = convert_linear_layer_to_lora(rm_model,
                                                args.lora_module_name,
                                                args.lora_dim)
        if args.only_optimize_lora:
            force_optimize_params.append('v_head.weight')
            rm_model = only_optimize_lora_parameters(rm_model,
                                                     force_optimize_params)
            rm_model = make_model_gradient_checkpointing_compatible(rm_model)

    train_phase = 2
    train_dataset, eval_dataset = create_prompt_dataset(
        args.local_rank, args.data_path, args.data_split,
        args.data_output_path, train_phase, args.seed, tokenizer,
        args.max_seq_len)

    # DataLoaders creation:
    data_collator = DataCollatorReward()
    if args.local_rank == -1:
        train_sampler = RandomSampler(train_dataset)
        eval_sampler = SequentialSampler(eval_dataset)
    else:
        train_sampler = DistributedSampler(train_dataset)
        eval_sampler = DistributedSampler(eval_dataset)
    train_dataloader = DataLoader(train_dataset,
                                  collate_fn=data_collator,
                                  sampler=train_sampler,
                                  batch_size=args.per_device_train_batch_size)
    eval_dataloader = DataLoader(eval_dataset,
                                 collate_fn=data_collator,
                                 sampler=eval_sampler,
                                 batch_size=args.per_device_eval_batch_size)

    def evaluation_reward(model, dataloader, eval_iters):
        model.eval()
        correct_predictions = 0
        total_predictions = 0
        chosen_scores = 0.
        rejected_scores = 0.
        for _step, _batch in enumerate(dataloader):
            _batch = to_device(_batch, device)
            with torch.no_grad():
                _outputs = model(**_batch)

            chosen = _outputs["chosen_mean_scores"]
            rejected = _outputs["rejected_mean_scores"]
            correct_predictions += (chosen > rejected).sum()
            total_predictions += chosen.shape[0]
            chosen_scores += _outputs["chosen_mean_scores"].mean().float()
            rejected_scores += _outputs["rejected_mean_scores"].mean().float()
            if (_step + 1) == eval_iters:
                break
        _acc = correct_predictions / total_predictions
        chosen_scores = chosen_scores / (_step + 1)
        rejected_scores = rejected_scores / (_step + 1)
        try:
            _acc = get_all_reduce_mean(_acc).item()
            chosen_scores = get_all_reduce_mean(chosen_scores).item()
            rejected_scores = get_all_reduce_mean(rejected_scores).item()
        except:
            pass
        return chosen_scores, rejected_scores, _acc

    # Split weights in two groups, one with weight decay and the other not.
    optimizer_grouped_parameters = get_optimizer_grouped_parameters(
        rm_model, args.weight_decay, args.lora_learning_rate)

    AdamOptimizer = DeepSpeedCPUAdam if args.offload else FusedAdam
    optimizer = AdamOptimizer(optimizer_grouped_parameters,
                              lr=args.learning_rate,
                              betas=(0.9, 0.95))

    num_update_steps_per_epoch = math.ceil(
        len(train_dataloader) / args.gradient_accumulation_steps)

    lr_scheduler = get_scheduler(
        name=args.lr_scheduler_type,
        optimizer=optimizer,
        num_warmup_steps=args.num_warmup_steps,
        num_training_steps=args.num_train_epochs * num_update_steps_per_epoch,
    )

    rm_model, optimizer, _, lr_scheduler = deepspeed.initialize(
        model=rm_model,
        optimizer=optimizer,
        args=args,
        config=ds_config,
        lr_scheduler=lr_scheduler,
        dist_init_required=True)

    if args.gradient_checkpointing:
        rm_model.gradient_checkpointing_enable()

    # Train!
    print_rank_0("***** Running training *****", args.global_rank)

    print_rank_0(
        f"***** Evaluating reward, Epoch {0}/{args.num_train_epochs} *****",
        args.global_rank)
#     reward_score, reject_score, acc = evaluation_reward(
#         rm_model, eval_dataloader, args.eval_iters)
#     print_rank_0(
#         f"chosen_last_scores (higher is better) : {reward_score}, "
#         f"rejected_last_scores (lower is better) : {reject_score}, "
#         f"acc (higher is better) : {acc}", args.global_rank)

    total_micro_steps = 0
    for epoch in range(args.num_train_epochs):
        print_rank_0(
            f"Beginning of Epoch {epoch+1}/{args.num_train_epochs}, Total Micro Batches {len(train_dataloader)}",
            args.global_rank)
        rm_model.train()
        mean_loss = 0
        for step, batch in enumerate(train_dataloader):
            batch = to_device(batch, device)
            outputs = rm_model(**batch, use_cache=False)
            loss = outputs["loss"]
            rm_model.backward(loss)
            rm_model.step()
            mean_loss += loss.item()
            total_micro_steps += 1
            gas_boundary = (total_micro_steps %
                            args.gradient_accumulation_steps == 0)
            total_steps = total_micro_steps // args.gradient_accumulation_steps
            if args.eval_interval and gas_boundary and (
                    total_steps % args.eval_interval == 0):
                print_rank_0(f"Iter {total_steps}: Evaluating reward",
                             args.global_rank)
                reward_score, reject_score, acc = evaluation_reward(
                    rm_model, eval_dataloader, args.eval_iters)
                print_rank_0(
                    f"Iter {total_steps}: c_scores: {reward_score}, r_scores: {reject_score}, "
                    f"diff: {reward_score - reject_score}, acc: {acc}",
                    args.global_rank)
                rm_model.train()

        print_rank_0(
            f"Epoch {epoch+1}/{args.num_train_epochs} with loss {mean_loss/(step+1)}",
            args.global_rank)
        # Evaluate reward_loss on the validation set.
        print_rank_0(
            f"***** Evaluating reward, Epoch {epoch+1}/{args.num_train_epochs} *****",
            args.global_rank)
        reward_score, reject_score, acc = evaluation_reward(
            rm_model, eval_dataloader, args.eval_iters)
        print_rank_0(
            f"chosen_last_scores (higher is better) : {reward_score}, "
            f"rejected_last_scores (lower is better) : {reject_score}, "
            f"acc (higher is better) : {acc}", args.global_rank)
        rm_model.tput_timer.update_epoch_count()

    if args.output_dir is not None:
        print_rank_0('saving model ...', args.global_rank)
        rm_model = convert_lora_to_linear_layer(rm_model)

        if args.global_rank == 0:
            save_hf_format(rm_model, tokenizer, args)
        if args.zero_stage == 3:
            # for zero stage 3, each gpu only has a part of the model, so we need to save the model on each gpu by using DS-Engine
            save_zero_three_model(rm_model,
                                  args.global_rank,
                                  args.output_dir,
                                  zero_stage=args.zero_stage)

if __name__ == "__main__":
    main()

the .sh file:

#!/bin/bash
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team
OUTPUT=$1
ZERO_STAGE=$2
if [ "$OUTPUT" == "" ]; then
    OUTPUT=./output
fi
if [ "$ZERO_STAGE" == "" ]; then
    ZERO_STAGE=3
fi
mkdir -p $OUTPUT

deepspeed main.py \
   --data_path local/jsonfile \
   --data_split 2,4,4 \
   --model_name_or_path /home/tione/notebook/model/Qwen-1_8B-Chat/ \
   --per_device_train_batch_size 2 \
   --per_device_eval_batch_size 8 \
   --max_seq_len 512 \
   --learning_rate 9.65e-6 \
   --weight_decay 0.1 \
   --num_padding_at_beginning 0 \
   --num_train_epochs 1  \
   --gradient_accumulation_steps 4 \
   --lr_scheduler_type cosine \
   --num_warmup_steps 0 \
   --seed 1234 \
   --gradient_checkpointing \
   --zero_stage $ZERO_STAGE \
   --deepspeed \
   --output_dir $OUTPUT \
#    &> $OUTPUT/training.log
Sun-Shiqi commented 6 months ago

I load QW like this : create_hf_model( model_class=AutoModelForCausalLM, model_name_or_path=actor_model_name_or_path, tokenizer=self.tokenizer, ds_config=ds_config, dropout=self.args.actor_dropout) without any error. Maybe the version of transformers or deepspeed is not right.