NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.08k stars 615 forks source link

rand_augment usage error #5636

Closed CoinCheung closed 4 days ago

CoinCheung commented 5 days ago

Version

1.31.0

Describe the bug.

When adding rand_augment, the program crashes

Minimum reproducible example

import os
import os.path as osp
import lmdb
import dill
import numpy as np
import re
import time
import random

import torch
import torch.distributed as dist

from nvidia.dali.plugin.pytorch import DALIClassificationIterator, LastBatchPolicy, DALIGenericIterator
from nvidia.dali.pipeline import pipeline_def
from nvidia.dali.auto_aug import rand_augment
import nvidia.dali.types as types
import nvidia.dali.fn as fn

class Pickler(object):

    def dumps(self, inp):
        if hasattr(inp, 'env') and not (inp.env is None):
            inp.env.close()
        inp.env = None
        return dill.dumps(inp)

    def loads(self, inp):
        out = dill.loads(inp)
        out.init_lmdb()
        return out

import numpy as np
class ExternalInputIterator(object):

    def __init__(self, file_root, file_anno, rank, world_size, n_epochs, global_batchsize):
        self.rank, self.world_size = rank, world_size
        self.global_batchsize = global_batchsize
        self.batchsize = global_batchsize // world_size
        self.file_root = file_root
        self.file_anno = file_anno
        self.n_epochs = n_epochs

        with open(file_anno, 'r') as fr:
            lines = fr.read().splitlines()#[:1500]
        self.n_samples = len(lines)
        self.im_paths = [el for el in lines]

        self.ep = 0
        self.iter = 0
        self.pos = 0
        n_inds = self.n_samples * self.n_epochs
        self.n_iters = n_inds // global_batchsize

        import numpy as np
        self.one_ep_inds = np.arange(self.n_samples, dtype=np.uint32)
        self.indices = np.array([], dtype=np.uint32)
        self.indices = self.gen_indices()

    def gen_indices(self):
        import numpy as np
        more_inds = np.empty(10 * self.n_samples, dtype=np.uint32)
        for cnt in range(10):
            ep = cnt + self.ep
            random_rng = np.random.default_rng(321 + ep)
            random_rng.shuffle(self.one_ep_inds)
            st, ed = cnt * self.n_samples, (cnt + 1) * self.n_samples
            more_inds[st:ed] = self.one_ep_inds
        self.indices = np.hstack([self.indices, more_inds])
        self.indices = self.indices[self.pos:]
        self.pos = 0
        self.ep += 10
        return self.indices

    def __iter__(self):
        self.iter = 0
        return self

    def get_bins(self, inds):
        import numpy as np
        im_bins = []
        for ind in inds:
            impth = self.im_paths[ind]
            with open(impth, 'rb') as fr:
                im_bin = fr.read()
            im_bins.append(np.frombuffer(im_bin, dtype=np.uint8))
        return im_bins

    def __next__(self):
        import numpy as np

        if self.iter >= self.n_iters:
            raise StopIteration

        if self.pos + self.global_batchsize >= self.indices.shape[0]:
            self.indices = self.gen_indices()
        st, ed = self.pos, self.pos + self.global_batchsize
        inds = self.indices[st : ed]
        inds = inds[self.rank::self.world_size]
        im_bins = self.get_bins(inds)

        self.iter += 1
        self.pos = ed

        return im_bins, inds.astype(np.int64)

    #  next = __next__

def decode_func(images, dali_cpu):
    dali_device = 'cpu' if dali_cpu else 'gpu'
    decoder_device = 'cpu' if dali_cpu else 'mixed'
    # ask nvJPEG to preallocate memory for the biggest sample in ImageNet for CPU and GPU to avoid reallocations in runtime
    device_memory_padding = 211025920 if decoder_device == 'mixed' else 0
    host_memory_padding = 140544512 if decoder_device == 'mixed' else 0
    # ask HW NVJPEG to allocate memory ahead for the biggest image in the data set to avoid reallocations in runtime
    preallocate_width_hint = 5980 if decoder_device == 'mixed' else 0
    preallocate_height_hint = 6430 if decoder_device == 'mixed' else 0

    shape = fn.peek_image_shape(images)
    images = fn.decoders.image(images,
                       #  device_memory_padding=device_memory_padding,
                       #  host_memory_padding=host_memory_padding,
                       #  preallocate_width_hint=preallocate_width_hint,
                       #  preallocate_height_hint=preallocate_height_hint,
                       device='mixed', output_type=types.RGB)
    images = images.gpu()
    return images

def trans_func_imagenet(images, shape, cropsize, random_area, image_mean, image_std):

    images = fn.random_resized_crop(images, device='gpu',
                        size=cropsize,
                        random_aspect_ratio=[0.75, 1.333333],
                        random_area=random_area,
                        antialias=True,
                        interp_type=types.INTERP_LINEAR)

    mirror = fn.random.coin_flip(probability=0.5)
    images = fn.flip(images, horizontal=mirror)

    images = rand_augment.rand_augment(images, shape=shape, n=2, m=24, num_magnitude_bins=31, max_translate_rel=[0.2, 0.2])

    images = fn.transpose(images, perm=[2, 0, 1])
    images = fn.normalize(
        images,
        dtype=types.FLOAT,
        mean=255 * np.array(image_mean).reshape(-1, 1, 1),
        stddev=255 * np.array(image_std).reshape(-1, 1, 1)
        )
    return images

@pipeline_def(enable_conditionals=True)
#  @pipeline_def
def pipeline_imagenet_pretrain(source, shard_id, num_shards, dali_cpu=False,
                                  random_area=[0.67, 1.0], cropsize=(256, 256),
                                  n_views=1, image_mean=[0.485, 0.456, 0.406],
                                  image_std=[0.229, 0.224, 0.225],
                                  ):

    images, inds = fn.external_source(source=source, num_outputs=2,
                                        dtype=[types.UINT8, types.INT64],
                                        parallel=True, batch=True,
                                        cycle='raise')

    shape = fn.peek_image_shape(images)
    images = decode_func(images, dali_cpu)

    res = (trans_func_imagenet(images, shape, cropsize,
                              random_area, image_mean, image_std)
           for _ in range(n_views))

    return *res, inds

def create_dali_loader():

    rank = int(os.environ['RANK'])
    local_rank = int(os.environ['LOCAL_RANK'])
    world_size = int(os.environ['WORLD_SIZE'])
    local_world_size = int(os.environ['LOCAL_WORLD_SIZE'])

    im_root=''
    im_anno = '/path/to/image_pathes_line_by_line.txt'

    batchsize = 1024
    n_epochs = 100

    py_num_workers = 1
    dali_num_threads = 8

    source = ExternalInputIterator(
                                   file_root=im_root,
                                   file_anno=im_anno,
                                   rank=rank,
                                   world_size=world_size,
                                   n_epochs=n_epochs,
                                   global_batchsize=batchsize)

    pipe = pipeline_imagenet_pretrain(batch_size=batchsize,
                                source=source,
                                num_threads=dali_num_threads,
                                device_id=local_rank,
                                seed=12 + local_rank,
                                dali_cpu=False,

                                py_start_method='spawn',
                                py_num_workers=py_num_workers,
                                py_callback_pickler=Pickler(),

                                prefetch_queue_depth=4,
                                shard_id=local_rank,
                                num_shards=world_size,
                                         )
    pipe.build()
    n_views = 1
    data_loader = DALIGenericIterator(pipe, [f'data_{i}' for i in range(n_views)] + ['indices',],
                                              last_batch_policy=LastBatchPolicy.DROP,
                                              auto_reset=True)
    return data_loader, source.n_iters

if __name__ == '__main__':

    local_rank = int(os.environ['LOCAL_RANK'])
    torch.cuda.set_device(local_rank)
    dist.init_process_group(backend='nccl')

    dataloader, total_iters = create_dali_loader()

    for data in dataloader:
        img = data[0]['data_0']
        print(img.shape)

Relevant log output

W0918 03:17:56.166000 140680036923200 torch/distributed/elastic/multiprocessing/api.py:858] Sending process 22197 closing signal SIGTERM
W0918 03:17:56.167000 140680036923200 torch/distributed/elastic/multiprocessing/api.py:858] Sending process 22198 closing signal SIGTERM
W0918 03:17:56.167000 140680036923200 torch/distributed/elastic/multiprocessing/api.py:858] Sending process 22200 closing signal SIGTERM
/opt/miniconda3/envs/py310/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 8 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
/opt/miniconda3/envs/py310/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 8 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
/opt/miniconda3/envs/py310/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 8 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
E0918 03:17:56.596000 140680036923200 torch/distributed/elastic/multiprocessing/api.py:833] failed (exitcode: -11) local_rank: 2 (pid: 22199) of binary: /opt/miniconda3/envs/py310/bin/python
Traceback (most recent call last):
  File "/opt/miniconda3/envs/py310/bin/torchrun", line 33, in <module>
    sys.exit(load_entry_point('torch==2.4.0', 'console_scripts', 'torchrun')())
  File "/opt/miniconda3/envs/py310/lib/python3.10/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 348, in wrapper
    return f(*args, **kwargs)
  File "/opt/miniconda3/envs/py310/lib/python3.10/site-packages/torch/distributed/run.py", line 901, in main
    run(args)
  File "/opt/miniconda3/envs/py310/lib/python3.10/site-packages/torch/distributed/run.py", line 892, in run
    elastic_launch(
  File "/opt/miniconda3/envs/py310/lib/python3.10/site-packages/torch/distributed/launcher/api.py", line 133, in __call__
    return launch_agent(self._config, self._entrypoint, list(args))
  File "/opt/miniconda3/envs/py310/lib/python3.10/site-packages/torch/distributed/launcher/api.py", line 264, in launch_agent

Other/Misc.

No response

Check for duplicates

JanuszL commented 4 days ago

Hi @CoinCheung,

Thank you for reaching out. I'm afraid the error message you shared doesn't provide any clue what might have gone wrong. I tried to run your code but all I get is:

torchrun --nproc_per_node=1 test153.py
Traceback (most recent call last):
  File "/home/user/Dali/dali/test153.py", line 263, in <module>
    dataloader, total_iters = create_dali_loader()
  File "/home/user/Dali/dali/test153.py", line 222, in create_dali_loader
    source = ExternalInputIterator(
  File "/home/user/Dali/dali/test153.py", line 48, in __init__
    with open(file_anno, 'r') as fr:
FileNotFoundError: [Errno 2] No such file or directory: './datasets/pil_save/lmdb/all_dedup_lmdb.txt.shape'
Exception ignored in: <function ExternalInputIterator.__del__ at 0x7a224b7c9a20>
Traceback (most recent call last):
  File "/home/user/Dali/dali/test153.py", line 78, in __del__
    if not self.env is None:
AttributeError: 'ExternalInputIterator' object has no attribute 'env'
[2024-09-18 09:20:28,890] torch.distributed.elastic.multiprocessing.api: [ERROR] failed (exitcode: 1) local_rank: 0 (pid: 2171562) of binary: /usr/bin/python3
Traceback (most recent call last):
  File "/usr/local/bin/torchrun", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 346, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/run.py", line 806, in main
    run(args)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/run.py", line 797, in run
    elastic_launch(
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/launcher/api.py", line 134, in __call__
    return launch_agent(self._config, self._entrypoint, list(args))
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/launcher/api.py", line 264, in launch_agent
    raise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError:

what makes me think if the code you provided is complete/self-contained? Does your problem reproduce without using rand_augment?

CoinCheung commented 4 days ago

Yes, if I comment out the line about rand-augment, there code can work well. The error message you posted above is associated with this line: Image

I created a txt file, and each line of which is a path to an image on the harddisk. Then I assign this txt file in the above line, thus the code should be able to work.

JanuszL commented 4 days ago

@CoinCheung,

I created a txt file, and each line of which is a path to an image on the harddisk. Then I assign this txt file in the above line, thus the code should be able to work.

Can you extend the example to generate all the necessary prerequisites so I'm sure that I'm running the same code as you do?

CoinCheung commented 4 days ago

@JanuszL Hi, here is a piece of sample code, you can run torchrun --nproc_per_node=4 main.py to see the error. Please download this: https://github.com/CoinCheung/eewee/releases/download/0.0.0/sample.zip

mzient commented 4 days ago

Hello @CoinCheung The bug is confirmed. Serendipitously, we've fixed it recently while working on another feature. Please try latest nightly build (from Sep 17th) - it should fix the problem. The upcoming release 1.42 will include the fix.

CoinCheung commented 4 days ago

Thanks for telling me this, I will wait the new release.