Closed chengjianhong closed 3 years ago
Hi, thanks for providing this information. Is it possible for you to provide a simple script that causes this issue so that I can track the cause of it?
Hi, you can create multi-task learning with the DistributedDataParallel method following the link https://pytorch.org/tutorials/intermediate/ddp_tutorial.html. My dataset is about medical image data, it is hard to share for this script.
Hi, I try to run the toy example about DistributedDataParallel
shown below. It seems that my implementation works in the scenario of DistributedDataParallel
. I am willing to help if you can provide more information about this issue.
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from pcgrad import PCGrad
class TestNet(nn.Module):
def __init__(self):
super().__init__()
self._linear = nn.Linear(3, 4)
def forward(self, x):
return self._linear(x)
def example(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("gloo", rank=rank, world_size=world_size)
model = TestNet().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss1_fn, loss2_fn = nn.L1Loss(), nn.MSELoss()
optimizer = PCGrad(optim.SGD(ddp_model.parameters(), lr=0.001))
outputs = ddp_model(torch.randn(2, 3).to(rank))
labels = torch.randn(2, 4).to(rank)
loss1, loss2 = loss1_fn(outputs, labels), loss2_fn(outputs, labels)
losses = [loss1, loss2]
optimizer.pc_backward(losses)
optimizer.step()
if __name__ == '__main__':
torch.manual_seed(4)
world_size = 2
mp.spawn(example, args=(world_size, ), nprocs=world_size, join=True)
Can you try to write and run a script for training with 100 epochs, and use DistributedSampler
as a data sampler. My main script is as follows and the following error will still occur. My environment is pytorch==1.7.1, torchvision==0.8.2, torchaudio==0.7.2 and cudatoolkit=11.0
.
import argparse
import os
import random
import logging
import numpy as np
import time
import setproctitle
import torch
import torch.backends.cudnn as cudnn
import torch.optim
from models.TransBTS.TransBTS_downsample8x_skipconnection_2 import TransBTS
import torch.distributed as dist
from models import criterions
from data.BraTS import BraTS
from torch.utils.data import DataLoader
from utils.tools import all_reduce_tensor
from utils.pcgrad import PCGrad
from tensorboardX import SummaryWriter
from torch import nn
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
parser = argparse.ArgumentParser()
# Basic Information
parser.add_argument('--user', default='jhcheng', type=str)
parser.add_argument('--experiment', default='TransBTS_IDH', type=str)
parser.add_argument('--date', default=local_time.split(' ')[0], type=str)
parser.add_argument('--description',
default='TransBTS,'
'training on train.txt!',
type=str)
# DataSet Information
parser.add_argument('--root', default='dataset/BraTS/BraTS_IDH', type=str)
parser.add_argument('--train_dir', default='', type=str)
parser.add_argument('--valid_dir', default='', type=str)
parser.add_argument('--mode', default='train', type=str)
parser.add_argument('--train_file', default='IDH_all.txt', type=str)
parser.add_argument('--valid_file', default='IDH_test.txt', type=str)
parser.add_argument('--dataset', default='brats_IDH', type=str)
parser.add_argument('--model_name', default='TransBTS', type=str)
parser.add_argument('--input_C', default=4, type=int)
parser.add_argument('--input_H', default=240, type=int)
parser.add_argument('--input_W', default=240, type=int)
parser.add_argument('--input_D', default=155, type=int)
parser.add_argument('--crop_H', default=128, type=int)
parser.add_argument('--crop_W', default=128, type=int)
parser.add_argument('--crop_D', default=128, type=int)
parser.add_argument('--output_D', default=155, type=int)
# Training Information
parser.add_argument('--lr', default=0.0002, type=float)
parser.add_argument('--weight_decay', default=1e-5, type=float)
parser.add_argument('--amsgrad', default=True, type=bool)
parser.add_argument('--criterion', default='softmax_dice', type=str)
parser.add_argument('--num_class', default=4, type=int)
parser.add_argument('--seed', default=1000, type=int)
parser.add_argument('--no_cuda', default=False, type=bool)
parser.add_argument('--gpu', default='2,3', type=str)
parser.add_argument('--num_workers', default=4, type=int)
parser.add_argument('--batch_size', default=4, type=int)
parser.add_argument('--start_epoch', default=0, type=int)
parser.add_argument('--end_epoch', default=1000, type=int)
parser.add_argument('--save_freq', default=1000, type=int)
parser.add_argument('--resume', default='', type=str)
parser.add_argument('--load', default=True, type=bool)
parser.add_argument('--local_rank', default=0, type=int, help='node rank for distributed training')
args = parser.parse_args()
def main_worker():
if args.local_rank == 0:
log_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'log', args.experiment+args.date)
log_file = log_dir + '.txt'
log_args(log_file)
logging.info('--------------------------------------This is all argsurations----------------------------------')
for arg in vars(args):
logging.info('{}={}'.format(arg, getattr(args, arg)))
logging.info('----------------------------------------This is a halving line----------------------------------')
logging.info('{}'.format(args.description))
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '18000'
torch.manual_seed(args.seed)
torch.cuda.manual_seed(args.seed)
random.seed(args.seed)
np.random.seed(args.seed)
torch.distributed.init_process_group('gloo',rank=args.local_rank,world_size=2)
torch.cuda.set_device(args.local_rank)
_, model = TransBTS(dataset='brats', _conv_repr=True, _pe_type="learned")
#model.cuda(args.local_rank)
model.to(args.local_rank)
model = nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank,
find_unused_parameters=True)
model.train()
#optimizer = torch.optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay, amsgrad=args.amsgrad)
optimizer = PCGrad(torch.optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay, amsgrad=args.amsgrad))
criterion = getattr(criterions, args.criterion)
cls_criterion = getattr(criterions,'cross_entropy')
if args.local_rank == 0:
checkpoint_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'checkpoint', args.experiment+args.date)
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
resume = ''
writer = SummaryWriter()
if os.path.isfile(resume) and args.load:
logging.info('loading checkpoint {}'.format(resume))
checkpoint = torch.load(resume, map_location=lambda storage, loc: storage)
model.load_state_dict(checkpoint['state_dict'])
logging.info('Successfully loading checkpoint {} and training from epoch: {}'
.format(args.resume, args.start_epoch))
else:
logging.info('re-training!!!')
train_list = os.path.join(args.root, args.train_dir, args.train_file)
train_root = os.path.join(args.root, args.train_dir)
train_set = BraTS(train_list, train_root, args.mode)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_set)
logging.info('Samples for train = {}'.format(len(train_set)))
num_gpu = (len(args.gpu)+1) // 2
train_loader = DataLoader(dataset=train_set, sampler=train_sampler, batch_size=args.batch_size // num_gpu,
drop_last=True, num_workers=args.num_workers, pin_memory=True)
start_time = time.time()
torch.set_grad_enabled(True)
for epoch in range(args.start_epoch, args.end_epoch):
train_sampler.set_epoch(epoch) # shuffle
setproctitle.setproctitle('{}: {}/{}'.format(args.user, epoch+1, args.end_epoch))
start_epoch = time.time()
for i, data in enumerate(train_loader):
#adjust_learning_rate(optimizer, epoch, args.end_epoch, args.lr)
optimizer.adjust_learning_rate(epoch, args.end_epoch, args.lr)
x, target,grade = data
grade = torch.unsqueeze(grade, dim=1).to(torch.float32)
x = x.cuda(args.local_rank, non_blocking=True)
target = target.cuda(args.local_rank, non_blocking=True)
grade = grade.cuda(args.local_rank, non_blocking=True)
idh_out,output = model(x)
loss, loss1, loss2, loss3 = criterion(output, target)
reduce_loss = all_reduce_tensor(loss, world_size=num_gpu).data.cpu().numpy()
reduce_loss1 = all_reduce_tensor(loss1, world_size=num_gpu).data.cpu().numpy()
reduce_loss2 = all_reduce_tensor(loss2, world_size=num_gpu).data.cpu().numpy()
reduce_loss3 = all_reduce_tensor(loss3, world_size=num_gpu).data.cpu().numpy()
cls_loss = cls_criterion(idh_out,grade)
cls_reduce_loss = all_reduce_tensor(cls_loss, world_size=num_gpu).data.cpu().numpy()
if args.local_rank == 0:
logging.info('Epoch: {}_Iter:{} seg_loss: {:.5f} cls_loss: {:.5f} || 1:{:.4f} | 2:{:.4f} | 3:{:.4f} ||'
.format(epoch, i, reduce_loss,cls_reduce_loss, reduce_loss1, reduce_loss2, reduce_loss3))
losses = [loss,cls_loss]
optimizer.zero_grad()
optimizer.pc_backward(losses)
optimizer.step()
end_epoch = time.time()
if args.local_rank == 0:
if (epoch + 1) % int(args.save_freq) == 0 \
or (epoch + 1) % int(args.end_epoch - 1) == 0 \
or (epoch + 1) % int(args.end_epoch - 2) == 0 \
or (epoch + 1) % int(args.end_epoch - 3) == 0:
file_name = os.path.join(checkpoint_dir, 'model_epoch_{}.pth'.format(epoch))
torch.save({
'epoch': epoch,
'state_dict': model.state_dict(),
'optim_dict': optimizer.state_dict(),
},
file_name)
writer.add_scalar('lr:', optimizer.param_groups[0]['lr'], epoch)
writer.add_scalar('seg_loss:', reduce_loss, epoch)
writer.add_scalar('cls_loss:', cls_reduce_loss, epoch)
writer.add_scalar('loss1:', reduce_loss1, epoch)
writer.add_scalar('loss2:', reduce_loss2, epoch)
writer.add_scalar('loss3:', reduce_loss3, epoch)
if args.local_rank == 0:
epoch_time_minute = (end_epoch-start_epoch)/60
remaining_time_hour = (args.end_epoch-epoch-1)*epoch_time_minute/60
logging.info('Current epoch time consumption: {:.2f} minutes!'.format(epoch_time_minute))
logging.info('Estimated remaining training time: {:.2f} hours!'.format(remaining_time_hour))
if args.local_rank == 0:
writer.close()
final_name = os.path.join(checkpoint_dir, 'model_epoch_last.pth')
torch.save({
'epoch': args.end_epoch,
'state_dict': model.state_dict(),
'optim_dict': optimizer.state_dict(),
},
final_name)
end_time = time.time()
total_time = (end_time-start_time)/3600
logging.info('The total training time is {:.2f} hours'.format(total_time))
logging.info('----------------------------------The training process finished!-----------------------------------')
def adjust_learning_rate(optimizer, epoch, max_epoch, init_lr, power=0.9):
for param_group in optimizer.param_groups:
param_group['lr'] = round(init_lr * np.power(1-(epoch) / max_epoch, power), 8)
def log_args(log_file):
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s ===> %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# args FileHandler to save log file
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
# args StreamHandler to print log to console
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
# add the two Handler
logger.addHandler(ch)
logger.addHandler(fh)
if __name__ == '__main__':
os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu
assert torch.cuda.is_available(), "Currently, we only support CUDA version"
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
main_worker()
obj 2021-03-20 15:43:48 ===> Epoch: 0_Iter:0 seg_loss: 2.94329 cls_loss: 0.63079 || 1:0.0272 | 2:0.0169 | 3:0.0126 ||
obj tensor(2.9168, device='cuda:1', grad_fn=<AddBackward0>) True
tensor(2.9698, device='cuda:0', grad_fn=<AddBackward0>) True
obj tensor(0.4884, device='cuda:1', grad_fn=<BinaryCrossEntropyBackward>) True
obj tensor(0.7731, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward>) True
Traceback (most recent call last):
File "train_IDH.py", line 297, in <module>
main_worker()
File "train_IDH.py", line 216, in main_worker
optimizer.pc_backward(losses)
File "/***/userfolder/workspace/TransBTS-main/utils/pcgrad.py", line 48, in pc_backward
grads, shapes, has_grads = self._pack_grad(objectives)
File "***/userfolder/workspace/TransBTS-main/utils/pcgrad.py", line 97, in _pack_grad
obj.backward(retain_graph=True) #retain_graph=True
File "/***/anaconda3/envs/py37-torch/lib/python3.7/site-packages/torch/tensor.py", line 221, in backward
torch.autograd.backward(self, gradient, retain_graph, create_graph)
File "/homeb/jhcheng/anaconda3/envs/py37-torch/lib/python3.7/site-packages/torch/autograd/__init__.py", line 132, in backward
allow_unreachable=True) # allow_unreachable flag
RuntimeError: Expected to mark a variable ready only once. This error is caused by one of the following reasons: 1) Use of a module parameter outside the `forward` function. Please make sure model parameters are not shared across multiple concurrent forward-backward passes2) Reused parameters in multiple reentrant backward passes. For example, if you use multiple `checkpoint` functions to wrap the same part of your model, it would result in the same set of parameters been used by different reentrant backward passes multiple times, and hence marking a variable ready multiple times. DDP does not support such use cases yet.
Traceback (most recent call last):
File "train_IDH.py", line 297, in <module>
main_worker()
File "train_IDH.py", line 216, in main_worker
optimizer.pc_backward(losses)
File "/***/userfolder/workspace/TransBTS-main/utils/pcgrad.py", line 48, in pc_backward
grads, shapes, has_grads = self._pack_grad(objectives)
File "/***/userfolder/workspace/TransBTS-main/utils/pcgrad.py", line 97, in _pack_grad
obj.backward(retain_graph=True) #retain_graph=True
File "/***/anaconda3/envs/py37-torch/lib/python3.7/site-packages/torch/tensor.py", line 221, in backward
torch.autograd.backward(self, gradient, retain_graph, create_graph)
File "/***/anaconda3/envs/py37-torch/lib/python3.7/site-packages/torch/autograd/__init__.py", line 132, in backward
allow_unreachable=True) # allow_unreachable flag
RuntimeError: Expected to mark a variable ready only once. This error is caused by one of the following reasons: 1) Use of a module parameter outside the `forward` function. Please make sure model parameters are not shared across multiple concurrent forward-backward passes2) Reused parameters in multiple reentrant backward passes. For example, if you use multiple `checkpoint` functions to wrap the same part of your model, it would result in the same set of parameters been used by different reentrant backward passes multiple times, and hence marking a variable ready multiple times. DDP does not support such use cases yet.
Traceback (most recent call last):
File "***/anaconda3/envs/py37-torch/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/***/anaconda3/envs/py37-torch/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/***/anaconda3/envs/py37-torch/lib/python3.7/site-packages/torch/distributed/launch.py", line 260, in <module>
main()
File "/***/anaconda3/envs/py37-torch/lib/python3.7/site-packages/torch/distributed/launch.py", line 256, in main
cmd=cmd)
subprocess.CalledProcessError: Command '['/homeb/jhcheng/anaconda3/envs/py37-torch/bin/python', '-u', 'train_IDH.py', '--local_rank=1']' returned non-zero exit status 1.
Hi! I try to combine DistributedSampler
with PCGrad in the toy example, and it works successfully.
My environment pytorch==1.7.1
and cuda=10.2
.
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from pcgrad import PCGrad
class Dataset(torch.utils.data.Dataset):
def __init__(self) -> None:
super().__init__()
self._data = torch.rand(100, 3)
self._label = torch.rand(100, 4)
def __len__(self):
return 100
def __getitem__(self, idx):
return self._data[idx], self._label[idx]
class TestNet(nn.Module):
def __init__(self):
super().__init__()
self._linear = nn.Linear(3, 4)
def forward(self, x):
return self._linear(x)
def example(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("gloo", rank=rank, world_size=world_size)
model = TestNet().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss1_fn, loss2_fn = nn.L1Loss(), nn.MSELoss()
optimizer = PCGrad(optim.SGD(ddp_model.parameters(), lr=0.001))
dataset = Dataset()
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
loader = torch.utils.data.DataLoader(dataset=dataset,
sampler=sampler,
batch_size=10,
drop_last=True,
num_workers=2,
pin_memory=True)
for ep in range(100):
for data, label in loader:
outputs = ddp_model(data.to(rank))
label = label.to(rank)
loss1, loss2 = loss1_fn(outputs, label), loss2_fn(outputs, label)
losses = [loss1, loss2]
optimizer.pc_backward(losses)
optimizer.step()
print('workder {}: toy training processing ends'.format(rank))
if __name__ == '__main__':
torch.manual_seed(4)
world_size = 2
mp.spawn(example, args=(world_size, ), nprocs=world_size, join=True)
Yeah, your script also works in my environment. It is a wonder that my code can work if not using PCGrad. Can you give me some advice based on the error notice? Can you share your WeChat?
"Expected to mark a variable ready only once. This error is caused by one of the following reasons: 1) Use of a module parameter outside the
forwardfunction. Please make sure model parameters are not shared across multiple concurrent forward-backward passes2) Reused parameters in multiple reentrant backward passes. For example, if you use multiple
checkpointfunctions to wrap the same part of your model, it would result in the same set of parameters been used by different reentrant backward passes multiple times, and hence marking a variable ready multiple times. DDP does not support such use cases yet."
Hello, I make a minor change to your script, the error reappears when the model has two outputs.
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from utils.pcgrad import PCGrad
class Dataset(torch.utils.data.Dataset):
def __init__(self) -> None:
super().__init__()
self._data = torch.rand(100, 3)
self._label = torch.rand(100, 4)
def __len__(self):
return 100
def __getitem__(self, idx):
return self._data[idx], self._label[idx]
class TestNet(nn.Module):
def __init__(self):
super().__init__()
self._linear_1 = nn.Linear(3, 4)
self._linear_2 = nn.Linear(4, 3)
self._linear_3 = nn.Linear(3, 4)
self._linear = nn.Linear(3, 4)
def forward(self, x):
x = self._linear_1(x)
x = self._linear_2(x)
return self._linear_3(x),self._linear(x)
def example(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("gloo", rank=rank, world_size=world_size)
model = TestNet().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss1_fn, loss2_fn = nn.L1Loss(), nn.MSELoss()
optimizer = PCGrad(optim.SGD(ddp_model.parameters(), lr=0.001))
dataset = Dataset()
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
loader = torch.utils.data.DataLoader(dataset=dataset,
sampler=sampler,
batch_size=10,
drop_last=True,
num_workers=2,
pin_memory=True)
for ep in range(100):
for data, label in loader:
output_1, output_2 = ddp_model(data.to(rank))
label = label.to(rank)
loss1, loss2 = loss1_fn(output_1, label), loss2_fn(output_2, label)
msg = 'Iter {0:}, Epoch {1:.4f}, seg_loss {2:.7f}, cls_loss {2:.7f}'.format(ep + 1, (ep + 1) / 2,
loss1, loss2)
print(msg)
losses = [loss1, loss2]
optimizer.pc_backward(losses)
optimizer.step()
print('workder {}: toy training processing ends'.format(rank))
if __name__ == '__main__':
os.environ['CUDA_VISIBLE_DEVICES'] = '1,3'
torch.manual_seed(4)
world_size = 2
mp.spawn(example, args=(world_size, ), nprocs=world_size, join=True)
I solve it! The model needs to be divided into two independent models. But how to avoid it if there is a model with two outputs.
I guess that it is caused by retain_graph=True
which is not appropriate for DDP
.(https://github.com/pytorch/pytorch/issues/47260) retain_graph=True
can keep the gradient graph after the backward
operation. Therefore, we can use PCGrad with one forward
with multiple backward
operations for each loss. You can try to modify the implementation of PCGrad as the issue https://github.com/WeiChengTseng/Pytorch-PCGrad/issues/2, and it also changes the usage of PCGrad.
Since there is no follow-up information, I close this issue.
@WeiChengTseng Can you share your WeChat? I want to chat more about this work with you.
To be honest, I don't have WeChat, but we can chat more about this work via email.
Hi, I use it in my task, it happens the following errors. Does it support distributed training?