This is a experimental repo to explore how to implement LLaMA with Deepspeed Pipeline Parallelism since the document is incomplete and very few projects are working on this. The repo hopes to provide a minimal prototype and training loop to implemement PP training for LLaMA and keep a note of possible bugs and the corresponding solutions.
We have provided a minimal template to launch hybrid training of PP and DP, and the config can be found in conf/llama_65b_metir_v1_pv91_v91_v5_0.yaml
.
It should be noted that the template cannot be directly run since this is extracted from another project and some parts are omitted.
But you can still quickly adapt it to your own usage by removing the relevant parts of dataset and collator initialization.
2023/07/02: Successfully enabling hybrid training of LLaMA-65B on two nodes with 16 * 80G A100.
2023/06/25: Repo established. Add some notes first and the code will soon be released when the clear is ready.
There are two main approaches to enable model initialization and loading pre-trained weights. One is first initializing the model using the from_pretrained
function of HuggingFace's transformers
repo.
In this case, you may refer to models.llama_ds_mp_wrap.get_model
for details.
The drawback of this method is that it will load the whole model for each worker. This will cause out-of-CPU-memory-usage when the model is large.
Another method is first initializing the sharded models with DeepSpeed's LayerSpec
class to implement post-initialization after pipeline parallelism partition. Then each rank only need to load the pre-trained weights for each own partition:
model_or_config = transformers.AutoConfig.from_pretrained(cfg.model_name_or_path)
layers = models.llama_ds_mp_wrap.get_layers_from_config(model_or_config)
model_pipe = PipelineModule(layers=layers,
num_stages=cfg.num_stages,
loss_fn=models.llama_ds_mp_wrap.loss_fn,
activation_checkpoint_interval=getattr(cfg, "activation_checkpoint_interval", 0)
)
...
model.load_checkpoint(cfg.model_name_or_path, load_module_only=True, load_optimizer_states=False, load_lr_scheduler_states=False)
Note that the pre-trained weights should be converted from HF format by using convert2ckpt.py
.
When dist.world_size
> num_stages
, hybrid training is automatically enabled. The number of stages of pipeline parallel (PP) is num_stages
while the degree of data-parallel (DP) is dist.world_size // num_stages
.
Different from traditional pre-trained language models, LLaMA do not need weight typing. So do not use TiedLayerSpec
to wrap embed_tokens
and lm_head
modules.
When hybrid training of PP and DP is enabled, DistributedSampler
should be carefully set for each rank w.r.t. its state (PP stage and DP group).
The core code snippet is as follows:
dp_degree = dist.get_world_size() // cfg.num_stages
if dp_degree > 1:
dp_id = model.grid.get_data_parallel_id()
sub_train_sampler = DistributedSampler(sub_train_dataset, num_replicas=dp_degree, rank=dp_id)
else:
sub_train_sampler = RandomSampler(sub_train_dataset)
In DeepSpeed design, among specific PP group, only the first and the last rank, i.e., stage=0 or stage=num_stages - 1
,
will fetch minibatch from dataloader, and the other ranks never fetch data.
Based on this, for the ranks where the dataloader will never be used, we can use placeholders to allocate the memory usage. This could be especially useful when training large models.
For example, when training LLaMA-65B with offload_optimizer=True
and num_stages=8
, the CPU memory usage is already nearly 800GB,
which will cause CPU memory OOM when you are using large dataset.
The code of dataset placeholder is as follows:
def load_empty_dataset_and_collator(cfg: DictConfig):
from data.test import TestDataset
from data.flan import FlanCollatorOverCollator
dataset = TestDataset(None, None, getattr(cfg, "total_dataset_len", -1))
collator = FlanCollatorOverCollator(collator=None,
tokenizer=cfg.model_name_or_path,
max_seq_length=128,
decoder_only=True,
return_standard_inputs=True,
)
# Keep consistent with `load_and_cache_examples`.
if getattr(cfg, "dist_load_data_barrier", True):
dist.barrier()
if dist.is_initialized():
dist.barrier()
return dataset, collator
if model.is_first_stage() or model.is_last_stage():
sub_train_dataset = load_and_cache_examples(cfg, tokenizer, _split="train", _file=_file)
if dp_degree > 1:
dp_id = model.grid.get_data_parallel_id()
sub_train_sampler = DistributedSampler(sub_train_dataset, num_replicas=dp_degree, rank=dp_id)
else:
sub_train_sampler = RandomSampler(sub_train_dataset)
sub_train_collator = hydra.utils.instantiate(cfg.collator) if "collator" in cfg and cfg.collator else None
sub_train_dataloader = DataLoader(dataset=sub_train_dataset,
sampler=sub_train_sampler,
batch_size=cfg.train_batch_size,
collate_fn=sub_train_collator,
num_workers=cfg.num_workers,
pin_memory=True,
prefetch_factor=cfg.prefetch_factor,
drop_last=True,
)
else:
sub_train_dataset, sub_train_collator = load_empty_dataset_and_collator(cfg)
sub_train_sampler = None
sub_train_dataloader = DataLoader(dataset=sub_train_dataset,
batch_size=cfg.train_batch_size,
collate_fn=sub_train_collator,
drop_last=True,
shuffle=False)
where TestDataset
is an empty dataset and the collator is arbitrary one meeting the input format.
Bfloat16 can be used by setting the following in deepspeed config:
data_types:
grad_accum_dtype: "fp32"
However, bfloat16 cannot be used with optimizer offload. Note that pipeline parallelism is designed not to support optimizer offload (see issue #3866). Nevertheless, it can still be enabled under fp16 training.
I cannot enable flash attention using both the original implementation or torch.nn.functional.scaled_dot_product_attention
from pytorch 2.0. See issue here and here.
Torch compilation is not supported in the template, which perhaps becuase my writing is incorrect.
//: # ()
//: # ()