Open suanflower opened 4 days ago
Hi @suanflower - can you please share your deepspeed version and ds_report and hardware?
Could you also try to format your original post a bit more so we can more clearly see repro steps as well as any repro scripts?
Thank you for your reply.
My environment is as follows:
deepspeed ==0.12.4
dsconfig
ds_config = { "train_batch_size": 16, "steps_per_print": 2000, "optimizer": { "type": "Adam", "params": { "lr": 0.001, "betas": [0.8, 0.999], "eps": 1e-8, "weight_decay": 3e-7, }, }, "scheduler": { "type": "WarmupLR", "params": { "warmup_min_lr": 0, "warmup_max_lr": 0.001, "warmup_num_steps": 1000, }, }, "gradient_clipping": 1.0, "prescale_gradients": False, "bf16": {"enabled": args.dtype == "bf16"}, "fp16": { "enabled": args.dtype == "fp16", "fp16_master_weights_and_grads": False, "loss_scale": 0, "loss_scale_window": 500, "hysteresis": 2, "min_loss_scale": 1, "initial_scale_power": 15, }, "wall_clock_breakdown": False, "zero_optimization": { "stage": args.stage, "allgather_partitions": True, "reduce_scatter": True, "allgather_bucket_size": 50000000, "reduce_bucket_size": 50000000, "overlap_comm": True, "contiguous_gradients": True, "cpu_offload": False, }, }
########################################################################
I am using the example of https://github.com/microsoft/DeepSpeedExamples/tree/faa0420554bce4d463a993ddc02de611ecc1404c/training/cifar
##################################################################### The code is as follows
import os
import deepspeed
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from deepspeed.accelerator import get_accelerator
from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
import io
from torchvision import transforms
def add_argument():
parser = argparse.ArgumentParser(description="CIFAR")
# For train.
parser.add_argument(
"-e",
"--epochs",
default=30,
type=int,
help="number of total epochs (default: 30)",
)
parser.add_argument(
"--local_rank",
type=int,
default=-1,
help="local rank passed from distributed launcher",
)
parser.add_argument(
"--log-interval",
type=int,
default=2000,
help="output logging information at a given interval",
)
# For mixed precision training.
parser.add_argument(
"--dtype",
default="fp16",
type=str,
choices=["bf16", "fp16", "fp32"],
help="Datatype used for training",
)
# For ZeRO Optimization.
parser.add_argument(
"--stage",
default=2,
type=int,
choices=[0, 1, 2, 3],
help="Datatype used for training",
)
# For MoE (Mixture of Experts).
parser.add_argument(
"--moe",
default=False,
action="store_true",
help="use deepspeed mixture of experts (moe)",
)
parser.add_argument(
"--ep-world-size", default=1, type=int, help="(moe) expert parallel world size"
)
parser.add_argument(
"--num-experts",
type=int,
nargs="+",
default=[
1,
],
help="number of experts list, MoE related.",
)
parser.add_argument(
"--mlp-type",
type=str,
default="standard",
help="Only applicable when num-experts > 1, accepts [standard, residual]",
)
parser.add_argument(
"--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported"
)
parser.add_argument(
"--min-capacity",
default=0,
type=int,
help="(moe) minimum capacity of an expert regardless of the capacity_factor",
)
parser.add_argument(
"--noisy-gate-policy",
default=None,
type=str,
help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter",
)
parser.add_argument(
"--moe-param-group",
default=False,
action="store_true",
help="(moe) create separate moe param groups, required when using ZeRO w. MoE",
)
# Include DeepSpeed configuration arguments.
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()
return args
def create_moe_param_groups(model):
"""Create separate parameter groups for each expert."""
parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
return split_params_into_different_moe_groups_for_optimizer(parameters)
def get_ds_config(args):
"""Get the DeepSpeed configuration dictionary."""
print(args.stage)
ds_config = {
"train_batch_size": 16,
"steps_per_print": 2000,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
"betas": [0.8, 0.999],
"eps": 1e-8,
"weight_decay": 3e-7,
},
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 0.001,
"warmup_num_steps": 1000,
},
},
"gradient_clipping": 1.0,
"prescale_gradients": False,
"bf16": {"enabled": args.dtype == "bf16"},
"fp16": {
"enabled": args.dtype == "fp16",
"fp16_master_weights_and_grads": False,
"loss_scale": 0,
"loss_scale_window": 500,
"hysteresis": 2,
"min_loss_scale": 1,
"initial_scale_power": 15,
},
"wall_clock_breakdown": False,
"zero_optimization": {
"stage": args.stage,
"allgather_partitions": True,
"reduce_scatter": True,
"allgather_bucket_size": 50000000,
"reduce_bucket_size": 50000000,
"overlap_comm": True,
"contiguous_gradients": True,
"cpu_offload": False,
},
}
return ds_config
class Net(nn.Module):
def __init__(self, args):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.moe = args.moe
if self.moe:
fc3 = nn.Linear(84, 84)
self.moe_layer_list = []
for n_e in args.num_experts:
# Create moe layers based on the number of experts.
self.moe_layer_list.append(
deepspeed.moe.layer.MoE(
hidden_size=84,
expert=fc3,
num_experts=n_e,
ep_size=args.ep_world_size,
use_residual=args.mlp_type == "residual",
k=args.top_k,
min_capacity=args.min_capacity,
noisy_gate_policy=args.noisy_gate_policy,
)
)
self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
self.fc4 = nn.Linear(84, 10)
else:
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
if self.moe:
for layer in self.moe_layer_list:
x, _, _ = layer(x)
x = self.fc4(x)
else:
x = self.fc3(x)
return x
def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
"""Test the network on the test data.
Args:
model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.
testset (torch.utils.data.Dataset): the test dataset.
local_device (str): the local device name.
target_dtype (torch.dtype): the target datatype for the test data.
test_batch_size (int): the test batch size.
"""
# The 10 classes for CIFAR10.
classes = (
"plane",
"car",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck",
)
# Define the test dataloader.
testloader = torch.utils.data.DataLoader(
testset, batch_size=test_batch_size, shuffle=False, num_workers=0
)
# For total accuracy.
correct, total = 0, 0
# For accuracy per class.
class_correct = list(0.0 for i in range(10))
class_total = list(0.0 for i in range(10))
# Start testing.
model_engine.eval()
with torch.no_grad():
for data in testloader:
images, labels = data
if target_dtype != None:
images = images.to(target_dtype)
outputs = model_engine(images.to(local_device))
_, predicted = torch.max(outputs.data, 1)
# Count the total accuracy.
total += labels.size(0)
correct += (predicted == labels.to(local_device)).sum().item()
# Count the accuracy per class.
batch_correct = (predicted == labels.to(local_device)).squeeze()
for i in range(test_batch_size):
label = labels[i]
class_correct[label] += batch_correct[i].item()
class_total[label] += 1
if model_engine.local_rank == 0:
print(
f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %"
)
# For all classes, print the accuracy.
for i in range(10):
print(
f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %"
)
class ParquetDataset(Dataset):
def __init__(self, parquet_file, transform=None):
self.data = pd.read_parquet(parquet_file)
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
# print(self.data.iloc[idx])
img_data = self.data.iloc[idx]['img']['bytes']
label = self.data.iloc[idx]['label']
if isinstance(img_data, bytes):
img = Image.open(io.BytesIO(img_data))
else:
img = img_data
if self.transform:
img = self.transform(img)
return img, label
def main(args):
# Initialize DeepSpeed distributed backend.
deepspeed.init_distributed()
_local_rank = int(os.environ.get("LOCAL_RANK"))
get_accelerator().set_device(_local_rank)
########################################################################
# Step1. Data Preparation.
#
# The output of torchvision datasets are PILImage images of range [0, 1].
# We transform them to Tensors of normalized range [-1, 1].
#
# Note:
# If running on Windows and you get a BrokenPipeError, try setting
# the num_worker of torch.utils.data.DataLoader() to 0.
########################################################################
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
if torch.distributed.get_rank() != 0:
# Might be downloading cifar data, let rank 0 download first.
torch.distributed.barrier()
# Load or download cifar data.
# trainset = torchvision.datasets.CIFAR10(
# root="./data", train=True, download=True, transform=transform
# )
# testset = torchvision.datasets.CIFAR10(
# root="./data", train=False, download=True, transform=transform
# )
trainset = ParquetDataset(parquet_file="ref_data/plain_text/train-00000-of-00001.parquet", transform=transform)
testset = ParquetDataset(parquet_file="ref_data/plain_text/test-00000-of-00001.parquet", transform=transform)
if torch.distributed.get_rank() == 0:
# Cifar data is downloaded, indicate other ranks can proceed.
torch.distributed.barrier()
########################################################################
# Step 2. Define the network with DeepSpeed.
#
# First, we define a Convolution Neural Network.
# Then, we define the DeepSpeed configuration dictionary and use it to
# initialize the DeepSpeed engine.
########################################################################
net = Net(args)
# Get list of parameters that require gradients.
parameters = filter(lambda p: p.requires_grad, net.parameters())
# If using MoE, create separate param groups for each expert.
if args.moe_param_group:
parameters = create_moe_param_groups(net)
# Initialize DeepSpeed to use the following features.
# 1) Distributed model.
# 2) Distributed data loader.
# 3) DeepSpeed optimizer.
ds_config = get_ds_config(args)
for name, param in net.named_parameters():
try:
print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
except:
print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
model_engine, optimizer, trainloader, __ = deepspeed.initialize(
args=args,
model=net,
model_parameters=parameters,
training_data=trainset,
config=ds_config,
)
# trainloader = DataLoader(trainset, batch_size=16, shuffle=True)
# Get the local device name (str) and local rank (int).
local_device = get_accelerator().device_name(model_engine.local_rank)
local_rank = model_engine.local_rank
# For float32, target_dtype will be None so no datatype conversion needed.
target_dtype = None
if model_engine.bfloat16_enabled():
target_dtype = torch.bfloat16
elif model_engine.fp16_enabled():
target_dtype = torch.half
for name, param in model_engine.named_parameters():
try:
print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
except:
print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
# Define the Classification Cross-Entropy loss function.
criterion = nn.CrossEntropyLoss()
########################################################################
# Step 3. Train the network.
#
# This is when things start to get interesting.
# We simply have to loop over our data iterator, and feed the inputs to the
# network and optimize. (DeepSpeed handles the distributed details for us!)
########################################################################
for epoch in range(args.epochs): # loop over the dataset multiple times
running_loss = 0.0
for i, data in enumerate(trainloader):
# Get the inputs. ``data`` is a list of [inputs, labels].
inputs, labels = data[0].to(local_device), data[1].to(local_device)
# print(f'## inputs: {inputs}, labels: {labels}')
# Try to convert to target_dtype if needed.
if target_dtype != None:
inputs = inputs.to(target_dtype)
outputs = model_engine(inputs)
loss = criterion(outputs, labels)
model_engine.backward(loss)
grads = {}
for name, param in model_engine.named_parameters():
if param.requires_grad and param.grad is not None:
grads[name] = param.grad
# print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
# 输出梯度
# print(grads)
model_engine.step()
# for name, param in model_engine.named_parameters():
# try:
# print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
# except:
# print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
# for name, param in net.named_parameters():
# try:
# print(f"netmodel_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
# except:
# print(f"netmodel_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
# Print statistics
running_loss += loss.item()
if local_rank == 0 and i % args.log_interval == (
args.log_interval - 1
): # Print every log_interval mini-batches.
print(
f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}, grads: {grads}"
)
running_loss = 0.0
print("Finished Training")
########################################################################
# Step 4. Test the network on the test data.
########################################################################
test(model_engine, testset, local_device, target_dtype)
if __name__ == "__main__":
args = add_argument()
main(args)```
grad
model_enginenew**module.fc1.bias: grad is None ===param.requires_grad: True
model_enginenew**module.fc2.weight: grad is None ===param.requires_grad: True
model_enginenew**module.fc2.bias: grad is None ===param.requires_grad: True
model_enginenew**module.fc3.weight: grad is None ===param.requires_grad: True
model_enginenew**module.fc3.bias: grad is None ===param.requires_grad: True
model_enginenew**module.conv1.weight: grad is None ===param.requires_grad: True
model_enginenew**module.conv1.bias: grad is None ===param.requires_grad: True
model_enginenew**module.conv2.weight: grad is None ===param.requires_grad: True
model_enginenew**module.conv2.bias: grad is None ===param.requires_grad: True
model_enginenew**module.fc1.weight: grad is None ===param.requires_grad: True```
import argparse import os
import deepspeed import torch import torch.nn as nn import torch.nn.functional as F import torchvision import torchvision.transforms as transforms from deepspeed.accelerator import get_accelerator from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer from torch.utils.data import Dataset, DataLoader import pandas as pd from PIL import Image import io from torchvision import transforms def add_argument(): parser = argparse.ArgumentParser(description="CIFAR")
def create_moe_param_groups(model): """Create separate parameter groups for each expert.""" parameters = {"params": [p for p in model.parameters()], "name": "parameters"} return split_params_into_different_moe_groups_for_optimizer(parameters)
def get_ds_config(args): """Get the DeepSpeed configuration dictionary.""" print(args.stage) ds_config = { "train_batch_size": 16, "steps_per_print": 2000, "optimizer": { "type": "Adam", "params": { "lr": 0.001, "betas": [0.8, 0.999], "eps": 1e-8, "weight_decay": 3e-7, }, }, "scheduler": { "type": "WarmupLR", "params": { "warmup_min_lr": 0, "warmup_max_lr": 0.001, "warmup_num_steps": 1000, }, }, "gradient_clipping": 1.0, "prescale_gradients": False, "bf16": {"enabled": args.dtype == "bf16"}, "fp16": { "enabled": args.dtype == "fp16", "fp16_master_weights_and_grads": False, "loss_scale": 0, "loss_scale_window": 500, "hysteresis": 2, "min_loss_scale": 1, "initial_scale_power": 15, }, "wall_clock_breakdown": False, "zero_optimization": { "stage": args.stage, "allgather_partitions": True, "reduce_scatter": True, "allgather_bucket_size": 50000000, "reduce_bucket_size": 50000000, "overlap_comm": True, "contiguous_gradients": True, "cpu_offload": False, }, } return ds_config
class Net(nn.Module): def init(self, args): super(Net, self).init() self.conv1 = nn.Conv2d(3, 6, 5) self.pool = nn.MaxPool2d(2, 2) self.conv2 = nn.Conv2d(6, 16, 5) self.fc1 = nn.Linear(16 5 5, 120) self.fc2 = nn.Linear(120, 84) self.moe = args.moe if self.moe: fc3 = nn.Linear(84, 84) self.moe_layer_list = [] for n_e in args.num_experts:
Create moe layers based on the number of experts.
def test(model_engine, testset, local_device, target_dtype, test_batch_size=4): """Test the network on the test data.
自定义数据集类
class ParquetDataset(Dataset): def init(self, parquet_file, transform=None):
读取 parquet 文件
def main(args):
Initialize DeepSpeed distributed backend.
if name == "main": args = add_argument() main(args) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ bash: deepspeed --num_gpus 4 --num_nodes 1 --hostfile /etc/aistudio/hostfile --master_addr $MASTER_ADDR --ssh_port 20023 ref.py --stage 3
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ model_enginenewmodule.conv1.bias: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.conv2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True model_enginenewmodule.fc1.bias: grad is None ===param.requires_grad: True model_enginenewmodule.conv1.weight: grad is None ===param.requires_grad: True model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True model_enginenew**module.fc1.bias: grad is None ===param.requires_grad: True
model_enginenew**module.fc2.weight: grad is None ===param.requires_grad: True
model_enginenewmodule.fc1.bias: grad is None ===param.requires_grad: True model_enginenewmodule.fc2.bias: grad is None ===param.requires_grad: True model_enginenew**module.conv1.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.fc2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.fc2.weight: grad is None ===param.requires_grad: True
model_enginenewmodule.fc3.weight: grad is None ===param.requires_grad: True model_enginenewmodule.fc2.bias: grad is None ===param.requires_grad: True model_enginenewmodule.fc2.bias: grad is None ===param.requires_grad: True model_enginenewmodule.fc3.bias: grad is None ===param.requires_grad: True
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ How to fix