Closed apachemycat closed 4 months ago
@apachemycat xtuner并未针对 mmengine 的 fsdp 进行测试,请优先考虑结合deepspeed zero2/3
同时,framework='huggingface'仅支持这种config的使用 https://github.com/InternLM/xtuner/blob/main/xtuner/configs/internlm/internlm_7b/internlm_7b_qlora_oasst1_e3_hf.py
多谢,主要考虑到fsdp 是 pytorch 原生支持的,比较方便,deepspeed需要额外的系统部署才能用,比较麻烦。
多谢,主要考虑到fsdp 是 pytorch 原生支持的,比较方便,deepspeed需要额外的系统部署才能用,比较麻烦。
其实deepspeed不麻烦的,直接pip安装就能用:)
pip install -U 'xtuner[deepspeed]'
训练时指明 --deepspeed deepspeed_zero2
或 --deepspeed deepspeed_zero3
即可
从mmengine 的fsdp搬过来的代码,修改模型加载逻辑和注释 model.to(torch.bfloat16)后看可以运行FSDP了,但是怎么确定确实是执行了FSDP逻辑。另外,这个代码感觉可以跟xutner的结合,不知道怎么结合。。
import argparse import copy from functools import partial import os
import torch from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy from torch.optim import AdamW from mmengine.optim import AmpOptimWrapper, CosineAnnealingLR, LinearLR from torch.utils.data import DataLoader, Dataset from transformers import LlamaForCausalLM, LlamaTokenizer from transformers.data import default_data_collator from transformers.models.llama.modeling_llama import LlamaDecoderLayer
from mmengine import load from mmengine._strategy import FSDPStrategy from mmengine.dataset import DefaultSampler from mmengine.dist.utils import is_main_process from mmengine.optim import StepLR from mmengine.utils import apply_to from mmengine.visualization import Visualizer, WandbVisBackend from xtuner.apis import DefaultTrainingArguments, build_model os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "backend:cudaMallocAsync" betas = (0.9, 0.999) weight_decay = 0 ORI_BATCH_SIZE = 8 max_norm = 1 # grad clip PROMPT_DICT = { 'prompt_input': ('Below is an instruction that describes a task, paired with an input ' 'that provides further context. ' 'Write a response that appropriately completes the request.\n\n' '### Instruction:\n{instruction}\n\n' '### Input:\n{input}\n\n### Response:'), 'prompt_no_input': ('Below is an instruction that describes a task. ' 'Write a response that appropriately completes the request.\n\n' '### Instruction:\n{instruction}\n\n### Response:'), }
class AlpacaDataset(Dataset):
def __init__(self, data_path, tokenizer, max_words=224):
self.ann = load(data_path)
self.max_words = max_words
self.tokenizer = tokenizer
def __len__(self):
return len(self.ann)
def __getitem__(self, index):
ann = self.ann[index]
if ann.get('input', '') == '':
prompt = PROMPT_DICT['prompt_no_input'].format_map(ann)
else:
prompt = PROMPT_DICT['prompt_input'].format_map(ann)
example = prompt + ann['output']
prompt = torch.tensor(self.tokenizer.encode(prompt), dtype=torch.int64)
example = self.tokenizer.encode(example)
example.append(self.tokenizer.eos_token_id)
example = torch.tensor(example, dtype=torch.int64)
padding = self.max_words - example.shape[0]
if padding > 0:
example = torch.cat(
(example, torch.zeros(padding, dtype=torch.int64) - 1))
elif padding < 0:
example = example[:self.max_words]
labels = copy.deepcopy(example)
labels[:len(prompt)] = -1
example_mask = example.ge(0)
label_mask = labels.ge(0)
example[~example_mask] = 0
labels[~label_mask] = 0
example_mask = example_mask.float()
label_mask = label_mask.float()
return {
'input_ids': example,
'labels': labels,
'attention_mask': example_mask,
}
def parse_args(): parser = argparse.ArgumentParser(description='Train alpaca with llama2') parser.add_argument('data_root', type=str) parser.add_argument('checkpoint', type=str) parser.add_argument('--output-dir', type=str, default='work_dirs') parser.add_argument('--max-epoch', type=int, default=3) parser.add_argument('--batch-size', type=int, default=4) parser.add_argument('--save-interval', type=int, default=500) args = parser.parse_args() return args
def train(): args = parse_args()
strategy = FSDPStrategy(
model_wrapper=dict(
auto_wrap_policy=partial(
transformer_auto_wrap_policy,
transformer_layer_cls={LlamaDecoderLayer})),
state_dict_cfg='full',
env_kwargs=dict(randomness=dict(seed=42)))
visualizer = Visualizer(
name='mmengine',
save_dir=args.output_dir,
vis_backends=[dict(type=WandbVisBackend)])
# Prepare model for internlm2 by wuzhhui
model, tokenizer = build_model(
model_name_or_path=args.checkpoint,
return_tokenizer=True)
# Prepare model for llama
#tokenizer = LlamaTokenizer.from_pretrained(args.checkpoint)
#tokenizer.add_special_tokens({'pad_token': '<PAD>'})
#model = LlamaForCausalLM.from_pretrained(args.checkpoint)
#model.to(torch.bfloat16)
model.train()
# Prepare dataset
train_dataset = AlpacaDataset(
tokenizer=tokenizer, data_path=args.data_root)
train_dataloader = DataLoader(
train_dataset,
batch_size=args.batch_size,
sampler=DefaultSampler(train_dataset, seed=0),
collate_fn=default_data_collator,
drop_last=True)
# Get the prepared model, scheduler and optimizer from strategy
epoch_length = len(train_dataloader)
max_iters = epoch_length * args.max_epoch
optim_cfg = dict(
type=AmpOptimWrapper,
optimizer=dict(
type=AdamW, lr=2e-4, betas=betas, weight_decay=weight_decay),
clip_grad=dict(max_norm=max_norm, error_if_nonfinite=False),
accumulative_counts=ORI_BATCH_SIZE / args.batch_size,
loss_scale='dynamic',
dtype='float16')
scheduler_cfgs = [dict(type=StepLR, step_size=1, gamma=0.85)]
model, optimizer, schedulers = strategy.prepare(
model,
optim_wrapper=optim_cfg,
param_scheduler=scheduler_cfgs,
dispatch_kwargs=dict(max_iters=max_iters, max_epochs=args.max_epoch))
for epoch in range(args.max_epoch):
for idx, inputs in enumerate(train_dataloader):
# Convert inputs to target device.
inputs = apply_to(inputs, lambda m: isinstance(m, torch.Tensor),
lambda m: m.cuda())
loss = model(**inputs).loss
optimizer.update_params(loss)
max_memory = torch.cuda.max_memory_allocated()
strategy.logger.info(f'Epoch: {epoch+1}/{args.max_epoch}, '
f'Iter: {idx+1}/{epoch_length}, '
f'Loss: {loss.item():.3f}, '
f'Lr: {optimizer.get_lr()["lr"][0]:.6f} '
f'Memory: {max_memory/1e9:.3f}G')
visualizer.add_scalars({'loss': loss.item()})
torch.cuda.reset_peak_memory_stats()
for scheduler in schedulers:
scheduler.step()
save_dir = f'{args.output_dir}/epoch_{epoch+1}'
state_dict = model.state_dict()
if is_main_process():
model.save_pretrained(save_dir, state_dict=state_dict)
tokenizer.save_pretrained(save_dir)
if name == 'main': train()
@apachemycat 推荐使用 mmengine 作为 trainer,xtuner 的很多功能,都不支持 huggingface trainer
多谢,我试试
model_wrapper_cfg=dict( type='MMDistributedDataParallel', find_unused_parameters=True) 指定 FSDPStrategy 并配置参数 size_based_auto_wrap_policy = partial( size_based_auto_wrap_policy, min_num_params=1e7) strategy = dict( type='FSDPStrategy', model_wrapper=dict(auto_wrap_policy=size_based_auto_wrap_policy))
model_wrapper_cfg=dict(type='MMFullyShardedDataParallel', cpu_offload=True,use_orig_params=False)
Model
framework='huggingface'
————————————————来自官网例子稍微改了数据等地方 日志中没有看到加载使用model_wrapper_cfg的情况,感觉是framework='huggingface'的时候,没有被自动从config.py中加载这个参数并实例化。 pretrained_model_name_or_path ='/models/meta-Llama-3-8B' use_varlen_attn = False
Data
data_files = ['/models/instruct-finetrain.json']
Data
alpaca_en_path = 'tatsu-lab/alpaca'
prompt_template = PROMPT_TEMPLATE.llama3_chat max_length = 512 pack_to_max_length = True
parallel
sequence_parallel_size = 1
Scheduler & Optimizer
batch_size = 8 # per_device accumulative_counts = 4 accumulative_counts *= sequence_parallel_size dataloader_num_workers = 0 max_epochs = 100 optim_type = AdamW lr = 1e-5 betas = (0.9, 0.999) weight_decay = 0 max_norm = 1 # grad clip warmup_ratio = 0.01
Save
save_steps = 20 save_total_limit = 2 # Maximum checkpoints to keep (-1 means unlimited)
Evaluate the generation performance during the training
evaluation_freq = 20 SYSTEM = SYSTEM_TEMPLATE.alpaca evaluation_inputs = [ '请给我介绍五个上海的景点', 'Please tell me five scenic spots in Shanghai' ]
#######################################################################
PART 2 Model & Tokenizer
####################################################################### tokenizer = dict( type=AutoTokenizer.from_pretrained, pretrained_model_name_or_path=pretrained_model_name_or_path, trust_remote_code=True, padding_side='right')
model = dict( type=SupervisedFinetune, use_varlen_attn=use_varlen_attn, llm=dict( type=AutoModelForCausalLM.from_pretrained, pretrained_model_name_or_path=pretrained_model_name_or_path, trust_remote_code=True, torch_dtype=torch.float16, quantization_config=dict( type=BitsAndBytesConfig, load_in_4bit=True, load_in_8bit=False, llm_int8_threshold=6.0, llm_int8_has_fp16_weight=False, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type='nf4')), lora=dict( type=LoraConfig, r=64, lora_alpha=16, lora_dropout=0.1, bias='none', task_type='CAUSAL_LM'))
training_args = dict(
type=TrainingArguments,
gradient_checkpointing=True, # Leads to reduction in memory at slighly decrease in speed
gradient_checkpointing_kwargs={"use_reentrant": False}
)
#######################################################################
PART 3 Dataset & Dataloader
####################################################################### alpaca_en = dict( type=process_hf_dataset, dataset=dict(type=load_dataset, path='json',data_files=data_files), tokenizer=tokenizer, max_length=max_length, dataset_map_fn=alpaca_map_fn, template_map_fn=dict( type=template_map_fn_factory, template=prompt_template), remove_unused_columns=True, shuffle_before_pack=True, pack_to_max_length=pack_to_max_length, use_varlen_attn=use_varlen_attn)
sampler = SequenceParallelSampler \ if sequence_parallel_size > 1 else DefaultSampler train_dataloader = dict( batch_size=batch_size, num_workers=dataloader_num_workers, dataset=alpaca_en, sampler=dict(type=sampler, shuffle=True), collate_fn=dict(type=default_collate_fn, use_varlen_attn=use_varlen_attn))
#######################################################################
PART 4 Scheduler & Optimizer
#######################################################################
optimizer
optim_wrapper = dict( type=AmpOptimWrapper, optimizer=dict( type=optim_type, lr=lr, betas=betas, weight_decay=weight_decay), clip_grad=dict(max_norm=max_norm, error_if_nonfinite=False), accumulative_counts=accumulative_counts, loss_scale='dynamic', dtype='float16')
learning policy
More information: https://github.com/open-mmlab/mmengine/blob/main/docs/en/tutorials/param_scheduler.md # noqa: E501
auto_scale_lr=dict(base_batch_size=4, enable=True) param_scheduler = [ dict( type=LinearLR, start_factor=1e-5, by_epoch=True, begin=0, end=10, convert_to_iter_based=True), dict( type=CosineAnnealingLR, eta_min=0.0, by_epoch=True, convert_to_iter_based=True) ]
train, val, test setting
train_cfg = dict(type=TrainLoop, max_epochs=max_epochs)
#######################################################################
PART 5 Runtime
#######################################################################
Log the dialogue periodically during the training process, optional
custom_hooks = [
dict(type=DatasetInfoHook, tokenizer=tokenizer),
dict(
type=EvaluateChatHook,
tokenizer=tokenizer,
every_n_iters=evaluation_freq,
evaluation_inputs=evaluation_inputs,
system=SYSTEM,
prompt_template=prompt_template)
]
if use_varlen_attn: custom_hooks += [dict(type=VarlenAttnArgsToMessageHubHook)]