NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.08k stars 615 forks source link

Help, Dali and GDS suitable source code needed. #4720

Closed aiinfralab closed 1 year ago

aiinfralab commented 1 year ago

For our current project, GPU Direct Storage is very important.

So, I'm trying to figure out how to use Dali and GDS to speed up learning, but I haven't found a suitable example.

In some samples I found before, POSIX appears to be faster than Dali GDS, so I couldn't use that code, it seems to be calling gds pipeline run() from pytorch's forward() function, which results in reconstructing the gds pipeline each time, so increases the overhead.

Could provide me with an optimized training source using DALI and GDS?

I need the entire source from image loader -> pipeline -> model training to making using guide about DALI in our company.

JanuszL commented 1 year ago

Hi @aiinfralab,

Thank you for reaching out. Please start with this guide to learn how to configure the numpy reader operator to use GDS. Then check these examples to learn how to plug DALI into a PyTorch training. You don't have to call pipeline.run() in your forward function manually and reconstruct the pipeline. There is also a couple of end-to-end example with PyTorch. Also, you mind find this blog post useful.

If you can tell us more about the workflow and requirements you have I can provide more detailed guidelines.

aiinfralab commented 1 year ago

Hi @JanuszL

Thanks for your help. Additionally, I must write a simple guide for our customer about DALI and GDS. (especially GDS) So, below code is just fit for my purpose, simple and easy to understand.

https://docs.nvidia.com/deeplearning/dali/user-guide/docs/examples/frameworks/pytorch/pytorch-lightning.html

As far as I know, I need to use DALI readers.numpy to use GDS, is it possible to modify the code to use GDS without modifying the rest of the source code by modifying only the part below?

@pipeline_def
def GetMnistPipeline(device, shard_id=0, num_shards=1):
    jpegs, labels = fn.readers.caffe2(path=data_path, shard_id=shard_id, num_shards=num_shards, random_shuffle=True, name="Reader")
    images = fn.decoders.image(jpegs,
                               device='mixed' if device == 'gpu' else 'cpu',
                               output_type=types.GRAY)
    images = fn.crop_mirror_normalize(images,
                                      dtype=types.FLOAT,
                                      std=[0.3081 * 255],
                                      mean=[0.1307 * 255],
                                      output_layout="CHW")
    if device == "gpu":
        labels = labels.gpu()
    # PyTorch expects labels as INT64
    labels = fn.cast(labels, dtype=types.INT64)
    return images, labels
JanuszL commented 1 year ago

Hi @aiinfralab,

I would rather use the examples from the numpy reader directly:

@pipeline_def(batch_size=batch_size, num_threads=3, device_id=0)
def pipe_gds():
    data = fn.readers.numpy(device='gpu', file_root=data_dir, files=files)
    return data

and wrap it inside the PyTorch-lighting loader:


class DALITestModel(TestModel):
    def __init__(self):
        super().__init__()

    def prepare_data(self):
      # no preparation is needed in DALI
      pass

    def setup(self, stage=None):
        device_id = self.local_rank
        shard_id = self.global_rank
        num_shards = self.trainer.world_size
        mnist_pipeline = pipe_gds(batch_size=BATCH_SIZE, device='gpu', device_id=device_id, shard_id=shard_id,
                                       num_shards=num_shards, num_threads=8)
        self.train_loader = DALIGenericIterator(mnist_pipeline, ["data"], reader_name="Reader",
                                                       last_batch_policy=LastBatchPolicy.PARTIAL)
    def train_dataloader(self):
        return self.train_loader

    def process_batch(self, batch):
        x = batch[0]["data"]
        return tuple(x)```
aiinfralab commented 1 year ago

Hi @JanuszL

Thank you so much for your response. Your answer really really helpful for me. ; -)

I have one more question. The code I am testing reads the training data in the path below. /raid/DALI_extra/db/MNIST/training/ (https://github.com/NVIDIA/DALI_extra.git) There are two files in that path: data.mdb, lock.mdb.

If I want to read those files with readers.numpy in the pipeline you mentioned, do I need to convert them to numpy format?

JanuszL commented 1 year ago

Hi @aiinfralab,

I'm sorry but we don't have the MNIST dataset in the NumPy format. The mentioned data.mdb, lock.mdb. are LMDB data base files in caffe2 format. They can be read by the caffe2 reader operator, not numpy one. What you need to do is download the MNIST and save each sample as numpy arrays on the disc.

aiinfralab commented 1 year ago

Hi @JanuszL

Thank you for your response.

I want to use a simple deep learning source to evaluate the performance of a case using only DALI and a case using DALI GDS. I want to show better results for using the GDS, but I can't find a suitable comparison source.

In the source provided by NVIDIA DALI, the training data is LMDB type, and I must write readers.numpy for the actual GDS to work, the data type should be NUMPY, but it is difficult to find a source to compare the case with and without GDS under the same conditions.


I configured the entire source based on the source code you gave me and tried to run it.

First of all, I removed the reader_name from the parameters of the DALIClassificationIterator because it was not identified (Does this affect the error?).

For now, here is the full source

import torch
from torch.nn import functional as F
from torch import nn
from pytorch_lightning import LightningModule
from pytorch_lightning import Trainer
from torch.optim import Adam
from torchvision.datasets import MNIST
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from nvidia.dali import pipeline_def
import os
import warnings
import nvidia.dali as dali
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch import DALIClassificationIterator, LastBatchPolicy
from nvidia.dali.plugin.pytorch import DALIGenericIterator
warnings.filterwarnings("ignore")

BATCH_SIZE = 64
# Path to MNIST dataset
#data_path = os.path.join(os.environ['DALI_EXTRA_PATH'], 'db/MNIST/training/')
data_path = "./data/train/"

@pipeline_def(batch_size=BATCH_SIZE, num_threads=3, device_id=0)
def pipe_gds():
    data = fn.readers.numpy(device='gpu', file_root=data_path ) # Path to MNIST dataset
    return data

class LitMNIST(LightningModule):

  def __init__(self):
    super().__init__()

    # mnist images are (1, 28, 28) (channels, width, height)
    self.layer_1 = torch.nn.Linear(28 * 28, 128)
    self.layer_2 = torch.nn.Linear(128, 256)
    self.layer_3 = torch.nn.Linear(256, 10)

  def forward(self, x):
    batch_size, channels, width, height = x.size()

    # (b, 1, 28, 28) -> (b, 1*28*28)
    x = x.view(batch_size, -1)
    x = self.layer_1(x)
    x = F.relu(x)
    x = self.layer_2(x)
    x = F.relu(x)
    x = self.layer_3(x)

    x = F.log_softmax(x, dim=1)
    return x

  def process_batch(self, batch):
      return batch

  def training_step(self, batch, batch_idx):
      x, y = self.process_batch(batch)
      logits = self(x)
      loss = F.nll_loss(logits, y)
      return loss

  def cross_entropy_loss(self, logits, labels):
      return F.nll_loss(logits, labels)

  def configure_optimizers(self):
      return Adam(self.parameters(), lr=1e-3)

  def prepare_data(self):
      # download data only
      self.mnist_train = MNIST(os.getcwd(), train=True, download=True, transform=transforms.ToTensor())

class DALITestMNIST(LitMNIST):
    def __init__(self):
        super().__init__()

    def prepare_data(self):
      # no preparation is needed in DALI
      pass

    def setup(self, stage=None):
        device_id = self.local_rank
        shard_id = self.global_rank
        num_shards = self.trainer.world_size
        mnist_pipeline = pipe_gds(batch_size=BATCH_SIZE, device_id=device_id,
                                       num_threads=4)

        self.train_loader = DALIClassificationIterator(mnist_pipeline,
                                                       last_batch_policy=LastBatchPolicy.PARTIAL)

    def train_dataloader(self):
        return self.train_loader

    def process_batch(self, batch):
        x = batch[0]["data"]
        return tuple(x)

if 'PL_TRAINER_GPUS' in os.environ:
    os.environ.pop('PL_TRAINER_GPUS')
model = DALITestMNIST()
trainer = Trainer(accelerator='gpu', max_epochs=5, devices=1, num_sanity_val_steps=0)
trainer.fit(model)

When I run it, I get the error like below, but there are actually two files in the ./data/train path, and the size is 376320128, which is the same.

(base) deepcluster@research01:/raid/esgoh/final$ python3 gds-mnist.py
/home/deepcluster/miniconda3/lib/python3.10/site-packages/nvidia/dali/backend.py:46: Warning: DALI support for Python 3.10 is experimental and some functionalities may not work.
  deprecation_warning("DALI support for Python {0}.{1} is experimental and some "
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
Traceback (most recent call last):
  File "/raid/esgoh/final/gds-mnist.py", line 108, in <module>
    trainer.fit(model)
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/pytorch_lightning/trainer/trainer.py", line 520, in fit
    call._call_and_handle_interrupt(
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/pytorch_lightning/trainer/call.py", line 44, in _call_and_handle_interrupt
    return trainer_fn(*args, **kwargs)
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/pytorch_lightning/trainer/trainer.py", line 559, in _fit_impl
    self._run(model, ckpt_path=ckpt_path)
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/pytorch_lightning/trainer/trainer.py", line 896, in _run
    call._call_setup_hook(self)  # allow user to setup lightning_module in accelerator environment
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/pytorch_lightning/trainer/call.py", line 83, in _call_setup_hook
    _call_lightning_module_hook(trainer, "setup", stage=fn)
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/pytorch_lightning/trainer/call.py", line 142, in _call_lightning_module_hook
    output = fn(*args, **kwargs)
  File "/raid/esgoh/final/gds-mnist.py", line 93, in setup
    self.train_loader = DALIClassificationIterator(mnist_pipeline,
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/nvidia/dali/plugin/pytorch.py", line 407, in __init__
    super(DALIClassificationIterator, self).__init__(pipelines, ["data", "label"],
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/nvidia/dali/plugin/pytorch.py", line 194, in __init__
    self._first_batch = DALIGenericIterator.__next__(self)
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/nvidia/dali/plugin/pytorch.py", line 211, in __next__
    outputs = self._get_outputs()
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/nvidia/dali/plugin/base_iterator.py", line 297, in _get_outputs
    outputs.append(p.share_outputs())
  File "/home/deepcluster/miniconda3/lib/python3.10/site-packages/nvidia/dali/pipeline.py", line 1000, in share_outputs
    return self._pipe.ShareOutputs()
RuntimeError: Critical error in pipeline:
Error when executing GPU operator readers__Numpy encountered:
Can't allocate 24092082176 bytes on device 0.
Current pipeline object is no longer valid.
(base) deepcluster@research01:/raid/esgoh/final$

Where did the size 24092082176 come from in the error message?

Even, I tried with 8 gpu(num_thread=8), every GPU was occured below errors.

Error when executing GPU operator readers__Numpy encountered:
Can't allocate 24092082176 bytes on device [0~7].
Current pipeline object is no longer valid.

If I set BATCH_SIZE 64 to 32, below error occured.

RuntimeError: [/opt/dali/dali/pipeline/data/tensor_list.cc:1012] Assert on "IsDenseTensor()" failed: The batch must be representable tensor - it must has uniform shape and be allocated in contiguous memory.
Stacktrace (43 entries):
JanuszL commented 1 year ago

Hi,

You can try out something like this:


# based on https://towardsdatascience.com/mnist-handwritten-digits-classification-from-scratch-using-python-numpy-b08e401c4dab

import numpy as np
import requests, gzip, os, hashlib

path="./data/raw/"
def fetch(url):
    if not os.path.isdir(path):
        os.makedirs(path)
    fp = os.path.join(path, hashlib.md5(url.encode('utf-8')).hexdigest())
    if os.path.isfile(fp):
        with open(fp, "rb") as f:
            data = f.read()
    else:
        with open(fp, "wb") as f:
            data = requests.get(url).content
            f.write(data)
    return np.frombuffer(gzip.decompress(data), dtype=np.uint8).copy()

X = fetch("http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz")[0x10:].reshape((-1, 1, 28, 28))
Y = fetch("http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz")[8:]
X_test = fetch("http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz")[0x10:].reshape((-1, 1, 28*28))
Y_test = fetch("http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz")[8:]

# save each sample as a numpy file
data_path = "./data/train/"
test_path = "./data/test/"
data_path_img = os.path.join(data_path, "img")
data_path_lab = os.path.join(data_path, "lab")
test_path_img = os.path.join(test_path, "img")
test_path_lab = os.path.join(test_path, "lab")

for path, data in zip([data_path_img, data_path_lab, test_path_img, test_path_lab], [X, Y, X_test, Y_test]):
    if not os.path.isdir(path):
        os.makedirs(path)
    for idx, d in enumerate(data):
        file = f"{path}/{idx:05d}"
        np.save(file, d)

import torch
from torch.nn import functional as F
from torch import nn
from pytorch_lightning import LightningModule
from pytorch_lightning import Trainer
from torch.optim import Adam
from torchvision.datasets import MNIST
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from nvidia.dali import pipeline_def
import os
import warnings
import nvidia.dali as dali
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch import DALIClassificationIterator, LastBatchPolicy
from nvidia.dali.plugin.pytorch import DALIGenericIterator
warnings.filterwarnings("ignore")

BATCH_SIZE = 64

@pipeline_def(batch_size=BATCH_SIZE, num_threads=3, device_id=0)
def pipe_gds():
    seed = np.random.randint(0,2**63)
    # the same seed for both readers to make them in sync
    images = fn.readers.numpy(device='gpu', file_root=data_path_img, random_shuffle=True, name="Reader", seed=seed) # Path to MNIST dataset
    labels = fn.readers.numpy(device='gpu', file_root=data_path_lab, random_shuffle=True, seed=seed)
    images = fn.crop_mirror_normalize(images,
                                      dtype=types.FLOAT,
                                      std=[0.3081 * 255],
                                      mean=[0.1307 * 255],
                                      output_layout="CHW")
    # PyTorch expects labels as INT64
    labels = fn.cast(labels, dtype=types.INT64)
    return images, labels

class LitMNIST(LightningModule):

  def __init__(self):
    super().__init__()

    # mnist images are (1, 28, 28) (channels, width, height)
    self.layer_1 = torch.nn.Linear(28 * 28, 128)
    self.layer_2 = torch.nn.Linear(128, 256)
    self.layer_3 = torch.nn.Linear(256, 10)

  def forward(self, x):
    batch_size, channels, width, height = x.size()

    # (b, 1, 28, 28) -> (b, 1*28*28)
    x = x.view(batch_size, -1)
    x = self.layer_1(x)
    x = F.relu(x)
    x = self.layer_2(x)
    x = F.relu(x)
    x = self.layer_3(x)

    x = F.log_softmax(x, dim=1)
    return x

  def process_batch(self, batch):
      return batch

  def training_step(self, batch, batch_idx):
      x, y = self.process_batch(batch)
      logits = self(x)
      loss = F.nll_loss(logits, y)
      return loss

  def cross_entropy_loss(self, logits, labels):
      return F.nll_loss(logits, labels)

  def configure_optimizers(self):
      return Adam(self.parameters(), lr=1e-3)

  def prepare_data(self):
      # download data only
      self.mnist_train = MNIST(os.getcwd(), train=True, download=True, transform=transforms.ToTensor())

class DALITestMNIST(LitMNIST):
    def __init__(self):
        super().__init__()

    def prepare_data(self):
      # no preparation is needed in DALI
      pass

    def setup(self, stage=None):
        device_id = self.local_rank
        shard_id = self.global_rank
        num_shards = self.trainer.world_size
        mnist_pipeline = pipe_gds(batch_size=BATCH_SIZE, device_id=device_id, num_threads=4)

        self.train_loader = DALIClassificationIterator(mnist_pipeline, reader_name="Reader",
                                                       last_batch_policy=LastBatchPolicy.PARTIAL)

    def train_dataloader(self):
        return self.train_loader

    def process_batch(self, batch):
        x = batch[0]["data"]
        y = batch[0]["label"].squeeze(-1)
        return (x, y)

if 'PL_TRAINER_GPUS' in os.environ:
    os.environ.pop('PL_TRAINER_GPUS')
model = DALITestMNIST()
trainer = Trainer(accelerator='gpu', max_epochs=5, devices=1, num_sanity_val_steps=0)
trainer.fit(model)```

Although MNIST files may be to small too show the performance difference with GDS. You can check for more details in [this thread](https://github.com/NVIDIA/DALI/issues/3972) regarding performance in selected scenarios.
aiinfralab commented 1 year ago

Hi @JanuszL

Thanks to you, I have solved the problem given to me well. Thank you very much.

I have tested variously with the sources you provided and found that building a pipeline is rather slower than not using GDS, but once the pipeline is built, overall process is more quickly because it is used over and over again in training.

Thanks again.