import oneflow as flow
import oneflow.nn as nn
from oneflow.nn.parallel import DistributedDataParallel as ddp
from copy import deepcopy
train_x = [
flow.tensor([[1, 2], [2, 3]], dtype=flow.float32),
flow.tensor([[4, 6], [3, 1]], dtype=flow.float32),
]
train_y = [
flow.tensor([[8], [13]], dtype=flow.float32),
flow.tensor([[26], [9]], dtype=flow.float32),
]
class Model(flow.nn.Module):
def __init__(self):
super().__init__()
self.lr = 0.01
self.iter_count = 500*1024
self.w = flow.nn.Parameter(flow.tensor([[0], [0]], dtype=flow.float32))
def forward(self, x):
x = flow.matmul(x, self.w)
return x
def is_parallel(model):
# Returns True if model is of type DDP
return type(model) in (nn.parallel.DistributedDataParallel,)
def de_parallel(model):
# De-parallelize a model: returns single-GPU model if model is of type DP or DDP
return model.module if is_parallel(model) else model
m = Model().to("cuda")
m = ddp(m)
loss = flow.nn.MSELoss(reduction="sum")
optimizer = flow.optim.SGD(m.parameters(), m.lr)
for i in range(0, m.iter_count):
rank = flow.env.get_rank()
x = train_x[rank].to("cuda")
y = train_y[rank].to("cuda")
y_pred = m(x)
l = loss(y_pred, y)
if (i + 1) % 500 == 0:
print(f"{i+1}/{m.iter_count} loss:{l}")
if rank in {-1,0}:
ckpt={
"model":deepcopy(de_parallel(m))
}
del ckpt
optimizer.zero_grad()
l.backward()
optimizer.step()
问题
最小可复现代码
注意:
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./ddp_train.py
0.8.1.dev20230102+cu117
)"model":deepcopy(de_parallel(m))
这一行代码不会发生内存泄露。版本信息
0.8.1.dev20230102+cu117