NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.06k stars 615 forks source link

Can not preprocess imagenet dataset with dali #5584

Open Weigaa opened 1 month ago

Weigaa commented 1 month ago

Describe the question.

I try to compare dali and pytorch preprocess speed use the followed code: ''' import torchvision.transforms as transforms import torchvision.datasets as datasets import torch from time import time

def torchvision_pipeline(batch_size, num_workers, data_path): transform = transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ])

dataset = datasets.ImageFolder(root=data_path, transform=transform)
loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, num_workers=num_workers, shuffle=True)

start_time = time()
for data, target in loader:
    pass
end_time = time()
print(f"Torchvision processing time: {end_time - start_time} seconds")

Parameters

batch_size = 256 num_workers = 16 data_path = '/home/lthpc/nvmessd/weizheng/data/imagenet/train' torchvision_pipeline(batch_size, num_workers, data_path)

# from nvidia.dali.pipeline import Pipeline import nvidia.dali.fn as fn import nvidia.dali.types as types from nvidia.dali.plugin.pytorch import DALIClassificationIterator, LastBatchPolicy import time

class DALIPipeline(Pipeline): def init(self, batch_size, num_threads, device_id, data_dir, device): super(DALIPipeline, self).init(batch_size, num_threads, device_id) self.device = device self.input = fn.readers.file(file_root=data_dir, random_shuffle=True, name="Reader") self.decode = fn.decoders.image(self.input, device=self.device, output_type=types.RGB) self.rescrop = fn.random_resized_crop(self.decode, size=(224, 224)) self.rng = fn.random.coin_flip(probability=0.5) # 概率决定是否翻转 self.flip = fn.flip(self.rescrop, horizontal=self.rng) # 使用随机数作为决定因子 self.cmn = fn.crop_mirror_normalize( self.flip, dtype=types.FLOAT, mean=[0.485 255, 0.456 255, 0.406 255], std=[0.229 255, 0.224 255, 0.225 255], output_layout="CHW" )

def define_graph(self):
    images = self.cmn
    return images

def run_dali_pipeline(batch_size, num_threads, device_id, data_dir, device): start_time = time.time() pipe = DALIPipeline(batch_size=batch_size, num_threads=num_threads, device_id=device_id, data_dir=data_dir, device=device) pipe.build()

dali_loader = DALIClassificationIterator(pipe, reader_name="Reader", last_batch_policy=LastBatchPolicy.PARTIAL)

for data in dali_loader:
    pass
end_time = time.time()
dali_loader.reset()

print(f"DALI {device.upper()} processing time: {end_time - start_time} seconds")

Parameters

batch_size = 256 num_threads = 16 device_id = 0 data_dir = '/home/lthpc/nvmessd/weizheng/data/imagenet/train'

Run experiments

run_dali_pipeline(batch_size, num_threads, device_id, data_dir, 'cpu') run_dali_pipeline(batch_size, num_threads, device_id, data_dir, 'gpu') run_dali_pipeline(batch_size, num_threads, device_id, data_dir, 'mixed') '''

The pytorch part code runs well but dali part got errors: ’‘’ [ERROR] [ERROR] [jpeg_parser] Could not check if code stream can be parsed - Failed to read [ERROR] [jpeg_parser] Could not check if code stream can be parsed - Failed to read[ERROR] [jpeg_parser[ERROR] [jpeg_parser] Could not check if code stream can be parsed - Failed to read [ERROR] [jpeg_parser] Could not check if code stream can be parsed - Failed to read [ERROR] [jpeg_parser] [ERROR] [ERROR] [ERROR] [jpeg_parser] Could not check if code stream can be parsed - Failed to read [ERROR] ] Could not check if code stream can be parsed - Failed to read[ERROR] [ERROR] [jpeg_parser] Could not check if code stream can be parsed - Failed to read [jpeg_parser[ERROR] [jpeg_parser] Could not check if code stream can be parsed - Failed to read [ERROR] [Could not check if code stream can be parsed - Failed to read[ERROR] ] Could not check if code stream can be parsed - Failed to read [ERROR] [jpeg_parser] Could not check if code stream can be parsed - Failed to read [jpeg_parser] Could not check if code stream can be parsed - Failed to read

[jpeg_parser] Could not check if code stream can be parsed - Failed to read [ jpeg_parser[jpeg_parser] Could not check if code stream can be parsed - Failed to read

[jpeg_parser] Could not check if code stream can be parsed - Failed to readjpeg_parser] Could not check if code stream can be parsed - Failed to read

] Could not check if code stream can be parsed - Failed to read Traceback (most recent call last): File "/home/lthpc/nvmessd/wj/ddlp/comparedali.py", line 85, in run_dali_pipeline(batch_size, num_threads, device_id, data_dir, 'cpu') File "/home/lthpc/nvmessd/wj/ddlp/comparedali.py", line 68, in run_dali_pipeline dali_loader = DALIClassificationIterator(pipe, reader_name="Reader", last_batch_policy=LastBatchPolicy.PARTIAL) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/lthpc/anaconda3/envs/wjtorch2.0.1/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/init.py", line 441, in init super(DALIClassificationIterator, self).init( File "/home/lthpc/anaconda3/envs/wjtorch2.0.1/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/init.py", line 224, in init self._first_batch = DALIGenericIterator.next(self) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/lthpc/anaconda3/envs/wjtorch2.0.1/lib/python3.11/site-packages/nvidia/dali/plugin/pytorch/init.py", line 239, in next outputs = self._get_outputs() ^^^^^^^^^^^^^^^^^^^ File "/home/lthpc/anaconda3/envs/wjtorch2.0.1/lib/python3.11/site-packages/nvidia/dali/plugin/base_iterator.py", line 385, in _get_outputs outputs.append(p.share_outputs()) ^^^^^^^^^^^^^^^^^ File "/home/lthpc/anaconda3/envs/wjtorch2.0.1/lib/python3.11/site-packages/nvidia/dali/pipeline.py", line 1160, in share_outputs return self._pipe.ShareOutputs() ^^^^^^^^^^^^^^^^^^^^^^^^^ RuntimeError: Critical error in pipeline: Error in CPU operator nvidia.dali.fn.decoders.image, which was used in the pipeline definition with the following traceback:

File "/home/lthpc/nvmessd/wj/ddlp/comparedali.py", line 85, in run_dali_pipeline(batch_size, num_threads, device_id, data_dir, 'cpu') File "/home/lthpc/nvmessd/wj/ddlp/comparedali.py", line 64, in run_dali_pipeline pipe = DALIPipeline(batch_size=batch_size, num_threads=num_threads, device_id=device_id, data_dir=data_dir, File "/home/lthpc/nvmessd/wj/ddlp/comparedali.py", line 45, in init self.decode = fn.decoders.image(self.input, device=self.device, output_type=types.RGB)

encountered:

nvImageCodec failure: '#4' Current pipeline object is no longer valid. ‘’‘

I am using the newest dali:

''' nvidia-dali-cuda120 1.39.0 pypi_0 pypi python 3.11.4 '''

How can I solve these errors? thank you.

Check for duplicates

szkarpinski commented 1 month ago

Hello @Weigaa , thank you for reaching out! I'll try to reproduce your issue on my machine in a moment

szkarpinski commented 1 month ago

@Weigaa , fn.readers.file returns tuples (file contents, label), so you probably need to replace

self.input = fn.readers.file(file_root=data_dir, random_shuffle=True, name="Reader")

with something like:

self.input, self.labels = fn.readers.file(file_root=data_dir, random_shuffle=True, name="Reader")

and then you'll probably want to return both the images and their labels from your pipeline in define_graph:

return images, self.labels

By the way, we have @pipeline_def decorator which allows you to define a pipeline in a more convenient way without subclassing Pipeline. You can see it in an example code here.