pyg-team / pytorch_geometric

Graph Neural Network Library for PyTorch
https://pyg.org
MIT License
21.14k stars 3.63k forks source link

Using `DistributedDataParallel` with PyG raises `TypeError: default_collate: batch must contain ...; found <class 'torch_geometric.data.data.Data'>` #8475

Open Sayan-m90 opened 10 months ago

Sayan-m90 commented 10 months ago

🚀 The feature, motivation and pitch

I did not see an example for this, but I am trying to run a PyG model using pytorch DDP with nccl as in this example multigpu But if I pass a PyG dataloader instead of the Dataloader from torch.util.data, I get the following error TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'torch_geometric.data.data.Data'> in line 104

Any help on how this can be done is much appreciated. I am running a pretty large number of graphs so I need the DDP for performance.

Alternatives

No response

Additional context

No response

akihironitta commented 10 months ago

@Sayan-m90 Thank you for reporting this issue. It'd be easier for us to comment or investigate this further if you could share your full code, error message and env details.

Sayan-m90 commented 9 months ago
from torch_geometric.datasets import Planetoid
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset
import torch
import os
from floe.api import ComputeCube, DecimalParameter, BooleanParameter, IntegerParameter
from orionplatform import RecordPortsMixin
import os.path as osp
from typing import List, Dict, Tuple
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
from torch_geometric.datasets import QM7b
import torch.nn.functional as F
from torch_geometric.loader import DataLoader as GDL
from torch.utils.data.distributed import DistributedSampler
from ogb.graphproppred import PygGraphPropPredDataset as OGBDataset
from torch.distributed import init_process_group
import torch.multiprocessing as mp
from torch.distributed import destroy_process_group
from .helper_files import get_open_port
import random
import torch_geometric.transforms as T

class CustomPairDataset(Dataset):

    def __init__(self, udata: List[Tuple[Data, Data, float]]):
        self.udata = udata

    def __len__(self):
        return len(self.udata)

    def __getitem__(self, idx):
        print('self udata', self.udata[idx])
        return self.udata[idx][0], self.udata[idx][1], self.udata[idx][2]

def ddp_setup(rank, world_size, freeport):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
        freeport:
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = f"{freeport}"
    # os.environ["NCCL_DEBUG"] = "INFO"
    init_process_group(backend="nccl", rank=rank, world_size=world_size, )
    torch.cuda.set_device(rank)

class GCN(torch.nn.Module):
    def __init__(self, num_features, num_classes):
        super().__init__()
        self.conv1 = GCNConv(num_features, 16)
        self.conv2 = GCNConv(16, num_classes)

    def forward(self, x, edge_index):
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

def nccltrainer(rank: int, world_size: int, epochs: int, batch_size: int,
                num_workers: int, freeport: int,
                model: torch.nn.Module, optimizer: torch.optim.Optimizer, verbose: bool,
                train_pair_graph: Dataset, test_pair_graph: Dataset, ):
    print('inside nccl ', rank, world_size, train_pair_graph)
    ddp_setup(rank, world_size, freeport)

    ptr = None
    if train_pair_graph is not None:
        ptr = GDL(train_pair_graph, batch_size=batch_size, sampler=DistributedSampler(train_pair_graph))  # , \

    trainer = Trainer(model, ptr, None, optimizer, gpu_id=rank, batch_size=batch_size)
    trainer.train(epochs)

    trainer.train(epochs)
    destroy_process_group()
    torch.cuda.empty_cache()

class Trainer:
    def __init__(
            self,
            model: torch.nn.Module,
            train_data: DataLoader,
            test_data: None,
            optimizer: torch.optim.Optimizer,
            gpu_id: int,
            batch_size: int,
    ) -> None:

        self.gpu_id = gpu_id
        model.to(torch.double)
        self.train_data = train_data
        # self.test_data = test_data
        self.optimizer = optimizer

        self.model = model

        self.model.to(gpu_id)
        print('moved model to gpu ', gpu_id)
        self.model = DDP(self.model, device_ids=[self.gpu_id], )  # find_unused_parameters=False,
        #        self.loss_criterion = loss_criterion
        self.batch_size = batch_size
        self.train_loss, self.train_mae, self.train_mse = [], [], []
        self.val_loss, self.val_mae, self.val_mse = [], [], []
        print('set init for trainer ', self.model, self.gpu_id)

    def train(self, max_epochs: int):
        print('inside train')
        for epoch in range(0, max_epochs):
            self.model.train()
            print(f'epoch {epoch} pre batch')
            for batch, batch1, y in self.train_data:
                print(f'epoch {epoch}, batch {batch}')
                self.optimizer.zero_grad()
                out = self.model(batch.x, batch.edge_index)
                print('model out ', out)
                out = out[:batch.batch_size]
                print('trimmed out ', out)
                loss = F.nll_loss(out, batch.y[:batch.batch_size].long())
                loss.backward()
                self.optimizer.step()

def loadSingle_PytorchGeom():
    #    dataset = 'Cora'
    #    path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', 'Planetoid')
    #    dataset = Planetoid(path, dataset)
    #    train_loader = dataset[0]
    #    test_loader = dataset[0]
    #    print('found data ', train_loader, '\n dataset ', dataset, test_loader)
    dataset_name = 'ogbg-molbace'
    root = './data/ogbg-molbace'

    # Download and process the dataset on main process.
    dataset = OGBDataset(dataset_name, root, )

    print('dataset ', dataset, dataset[0], len(dataset))
    print('dataset1 ', dataset[1])
    # for data in dataset:
    return dataset

def loadRandom_PytorchGeom(num_nodes, features_dim, edge_dim, num_edges, nummols):
    udata = []
    for i in range(nummols):
        data1 = Data(
            x=torch.rand(num_nodes, features_dim),
            edge_index=torch.randint(0, num_nodes, (2, num_edges)),
            edge_attr=torch.rand(num_nodes, edge_dim)
        )
        data2 = Data(
            x=torch.rand(num_nodes, features_dim),
            edge_index=torch.randint(0, num_nodes, (2, num_edges)),
            edge_attr=torch.rand(num_nodes, edge_dim)
        )

        udata.append([data1, data2, random.uniform(0, 1)])
    return udata

def begin(self):
    dataset_pyg = loadSingle_PytorchGeom()
    num_nodes, features_dim, edge_dim, num_edges = 10, 6, 3, 10
    batch_size, epoch, verbose = 64, 2, True
    dataset = loadRandom_PytorchGeom(num_nodes, features_dim, edge_dim, num_edges, batch_size)
    datasetCl = CustomPairDataset(dataset)
    model = GCNConv(features_dim, 2)

    optimizer = torch.optim.Adam(model.parameters(),
                                 lr=0.001)
    world_size = torch.cuda.device_count()
    free_port = get_open_port()
    torch.cuda.mem_get_info()
    print('world size ', world_size)

    print('out of trained model with data')
    try:
        if optionforspawn == 0:
            mp.spawn(nccltrainer, args=(world_size, epoch, batch_size,
                                        1, free_port,
                                        model, optimizer, verbose,
                                        datasetCl, None), nprocs=world_size)

        elif optionforspawn == 1:
            mp.spawn(nccltrainer, args=(world_size, epoch, batch_size,
                                        1, free_port,
                                        model, optimizer, verbose,
                                        dataset_pyg, None), nprocs=world_size)
        elif optionforspawn == 2:
            mp.spawn(nccltrainer, args=(world_size, epoch, batch_size,
                                        1, free_port,
                                        model, optimizer, verbose,
                                        None, None), nprocs=world_size)
    except Exception as ve:
        print('caught value error for spawn Dataset ', ve)

 if __name__ == '__main__':
     begin()
Sayan-m90 commented 9 months ago

Here is a snippet apologies for the delay. My only concern is loading the data in a multi GPU DDP setup. In this regard, this is where I am stuck:

rusty1s commented 9 months ago

Sorry, I have a hard time to understand your example. Where are you passing the data loader in the first-place? It doesn't seem to be used anywhere.

Sayan-m90 commented 9 months ago

ahh I removed that line while trying to remove some commented out code. It should make sense now,using it right after I load the dataloader

rusty1s commented 9 months ago

I am getting ValueError: bad value(s) in fds_to_keep when running your script.

Sayan-m90 commented 9 months ago

let me try re-running, can you tell me the line number/reference line? I don't see fds_to_keep in the code. If its from the loadSingle_PytorchGeom function, I am not using it for this run

rusty1s commented 9 months ago

Got it. For datasetCl, the script works for me (except for some dtype issue since model is moved to torch.double). Which PyTorch lightning version are you using?

Sayan-m90 commented 9 months ago

I am not using pytorch lightning for my code. I can if it is recommended. I was looking to optimize the dataloading part since i have a large amount of data and that is my current bottleneck. Here are my torchgeometric and pytorch versions:

torch-geometric==2.3.1
torch==2.1.1
rusty1s commented 9 months ago

I am not using pytorch lightning for my code.

Yeah, sorry. I misread. Since I am not able to reproduce, I am not so sure what's the issue here. Can you confirm that it is picking up the PyG DataLoader rather than the one coming from torch.utils.data.DataLoader?

Sayan-m90 commented 9 months ago

yes indeed! here are the relevant lines:

from torch_geometric.loader import DataLoader as GDL 
ptr = GDL(train_pair_graph, batch_size=batch_size, sampler=DistributedSampler(train_pair_graph))

so far my train_pair_graph is a list. it works for length 1000 but I have >5000 elements in train_pair_graph, am getting an error: RuntimeError: unable to mmap 1480 bytes from file </torch_1_1816571983_62963>: Cannot allocate memory (12)

Please help with the dataloading, I thought the code ran on your end, what seems to be the issue now?

rusty1s commented 9 months ago

This looks like you are running out of shared memory.