NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.06k stars 615 forks source link

How to move depths to the same gpu as decoded_imgs? #5611

Open kristinat8 opened 2 weeks ago

kristinat8 commented 2 weeks ago

Describe the question.

Hi, I want to load images and depth via external_source,images are in jpg format and depth is in npy format, I need the depth and images to match. In pipe, images are on gpu, but depths are on cpu, for subsequent processing I need to do the same cropping, and stitching. The following problem occurs in fn.cat:

batch_size = 32

class ExternalInputCallable:
    def __init__(self, batch_size):
        self.img_dir = 'dataset-all'
        self.depth_dir = 'depth'
        self.batch_size = batch_size
        self.img_files = sorted(glob.glob(os.path.join(self.img_dir, '*.jpg')), key = self.extract_number)
        self.npz_files = [
            os.path.join(self.depth_dir, os.path.splitext(os.path.basename(f))[0]+ '.npz')
            for f in self.img_files
        ]
        self.full_iterations = len(self.img_files) // batch_size

    def extract_number(self, filename):
        base_name = os.path.basename(filename)
        frame_number = int(base_name.split('_')[1].split('.')[0])
        return frame_number

    def __call__(self, sample_info):
        sample_index = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            raise StopIteration

        jpg_filename = self.img_files[sample_index]
        depth_filename = self.npz_files[sample_index]

        with open(jpg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)

        with np.load(depth_filename, allow_pickle=True) as data:
            depth_data = data['depth'].astype(np.uint8)
            depth_data = np.expand_dims(depth_data, axis=2)

        return encoded_img, depth_data

@pipeline_def(
    batch_size=batch_size, num_threads=16, device_id=0, py_num_workers=32
)
def combined_pipeline():
    jpegs , depths = fn.external_source(
        source=ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.UINT8],
    )
    decoded_imgs = fn.decoders.image(jpegs, device="mixed")

    combined = fn.cat(decoded_imgs, depths, axis=2)
    return combined ,depths

pipe = combined_pipeline()
pipe.build()

start_time = time.time()
for i in range(100):
    outputs = pipe.run()

    combined_data = outputs

end_time = time.time()
print(f"time: {end_time-start_time}")

Traceback (most recent call last): File "mashangshan.py", line 70, in outputs = pipe.run() File "/home/u202320081200023/miniconda3/envs/dora/lib/python3.8/site-packages/nvidia/dali/pipeline.py", line 1328, in run return self.outputs() File "/home/u202320081200023/miniconda3/envs/dora/lib/python3.8/site-packages/nvidia/dali/pipeline.py", line 1166, in outputs return self._outputs() File "/home/u202320081200023/miniconda3/envs/dora/lib/python3.8/site-packages/nvidia/dali/pipeline.py", line 1251, in _outputs return self._pipe.Outputs() RuntimeError: Critical error in pipeline: Error in GPU operator nvidia.dali.fn.cat, which was used in the pipeline definition with the following traceback:

File "mashangshan.py", line 62, in combined_pipeline combined = fn.cat(decoded_imgs, depths, axis=2)

encountered:

Assert on "inputs_[idx].device == StorageDevice::GPU" failed: The input 1 is not on the requested device (GPU). C++ context: [/opt/dali/dali/pipeline/workspace/workspace.h:637] Current pipeline object is no longer valid.

Check for duplicates

JanuszL commented 2 weeks ago

Hi @kristinat8,

Thank you for reaching out. Can you try:

 combined = fn.cat(decoded_imgs, depths.gpu(), axis=2)
kristinat8 commented 2 weeks ago

Hi @kristinat8,

Thank you for reaching out. Can you try:

 combined = fn.cat(decoded_imgs, depths.gpu(), axis=2)

If I want to process the depth in the pipeline in CHW format is there any way I can do it?,Thanks!

JanuszL commented 2 weeks ago

Hi @kristinat8,

I'm not sure if I understand your ask.

kristinat8 commented 2 weeks ago

Hi @kristinat8,

I'm not sure if I understand your ask.

Hello, when I use distributed computing, I encountered a decoding error with the following code. Could you please tell me what might be the reason?

@pipeline_def(batch_size=80, num_threads=16, enable_conditionals=True)
def VideoPipe(total_picture, file_list, dfile_list, local_crops_number, frame_per_clip):
    rank = utils.get_rank()
    world_size = utils.get_world_size()
    input = fn.readers.file(file_list=file_list, random_shuffle=False, num_shards=world_size, shard_id=rank)
    depth = fn.readers.file(file_list=dfile_list, random_shuffle=False, num_shards=world_size, shard_id=rank)
    shapes = fn.peek_image_shape(input[0])
    # crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
    # crop_anchor = fn.permute_batch(crop_anchor, indices=batch_size * [0])
    # crop_shape = fn.permute_batch(crop_shape , indices=batch_size * [0])
    # print(crop_shape)

    # init_crop
    num_clips = total_picture // frame_per_clip
    indices = np.concatenate([i * np.ones(frame_per_clip, dtype=int) for i in range(num_clips)])
    indices = indices.tolist()

    crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
    crop_anchor = fn.permute_batch(crop_anchor, indices=indices)
    crop_shape = fn.permute_batch(crop_shape , indices=indices)

    images = fn.decoders.image_slice(input[0], crop_anchor, crop_shape, device="mixed", axis_names="HW").gpu() #HWC
    depths = fn.decoders.image_slice(depth[0], crop_anchor, crop_shape, device="mixed", axis_names="HW") #HWC

    images = fn.resize(images, resize_x=300, resize_y=300, device="gpu") #HWC
    depths = fn.resize(depths, resize_x=300, resize_y=300, device="gpu") #HWC

    frames = fn.transpose(images, perm=[2, 0, 1]) #CHW
    depths = fn.transpose(depths, perm=[2, 0, 1]) #CHW
    # frames = fn.normalize(frames, mean=0.0, stddev=255.0, device="gpu") #CHW

    global1, global1_depth= process_global(frames, depths, indices)
    # global1_combined = process_global(frames, depths, indices)
    locals, local_depths = map(list, zip(*[process_local(frames, depths) for _ in range(local_crops_number)]))
    return global1, global1_depth, *locals, *local_depths

ERROR] [nvjpeg_cudadecoder] Could not decode jpeg code stream - nvjpeg error #4 (Jpeg not supported) when running nvjpegJpegStreamParse(handle, static_cast<const unsigned char*>(stream_ctx->encoded_streamdata), stream_ctx->encoded_stream_datasize, false, false, p.parsestate.nvjpegstream) at /home/jenkins/agent/workspace/nvimagecodec/helpers/release_v0.3.0/Release_11/build/extensions/nvjpeg/cuda_decoder.cpp:472 [WARNING] [nvimgcodec] [nvjpeg_cuda_decoder] decode #110 fallback Epoch: [0/100]: 1%|█ | 5/974 [00:59<3:06:05, 11.52s/it][ERROR] [libjpeg_turbo_decoder] Could not decode jpeg code stream - Premature end of JPEG data. Stopped at line 962/2061

kristinat8 commented 2 weeks ago

> > I'm not sure if I understand your ask.
> > 
> > * if you want to specify the `depths` layout - please use `layout` argument of [the `external source` operator](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.external_source.html#nvidia.dali.fn.external_source)
> > * you can use [the `transpose` operator](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.transpose.html) to change the layout
> > * you can use [th `crop-mirrorp-normalize`](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.crop_mirror_normalize.html#) operator to change the data layout using `output_layout` argument (and leave other arguments to their defaults)
> 
> Hello, when I use distributed computing, I encountered a decoding error with the following code. Could you please tell me what might be the reason?
> 
> @pipeline_def(batch_size=80, num_threads=16, enable_conditionals=True)
> def VideoPipe(total_picture, file_list, dfile_list, local_crops_number, frame_per_clip):
>     rank = utils.get_rank()
>     world_size = utils.get_world_size()
>     input = fn.readers.file(file_list=file_list, random_shuffle=False, num_shards=world_size, shard_id=rank)
>     depth = fn.readers.file(file_list=dfile_list, random_shuffle=False, num_shards=world_size, shard_id=rank)
>     shapes = fn.peek_image_shape(input[0])
>     # crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
>     # crop_anchor = fn.permute_batch(crop_anchor, indices=batch_size * [0])
>     # crop_shape = fn.permute_batch(crop_shape , indices=batch_size * [0])
>     # print(crop_shape)
>     
>     # init_crop
>     num_clips = total_picture // frame_per_clip
>     indices = np.concatenate([i * np.ones(frame_per_clip, dtype=int) for i in range(num_clips)])
>     indices = indices.tolist()
> 
>     crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
>     crop_anchor = fn.permute_batch(crop_anchor, indices=indices)
>     crop_shape = fn.permute_batch(crop_shape , indices=indices)
> 
> 
>     images = fn.decoders.image_slice(input[0], crop_anchor, crop_shape, device="mixed", axis_names="HW").gpu() #HWC
>     depths = fn.decoders.image_slice(depth[0], crop_anchor, crop_shape, device="mixed", axis_names="HW") #HWC
> 
>     images = fn.resize(images, resize_x=300, resize_y=300, device="gpu") #HWC
>     depths = fn.resize(depths, resize_x=300, resize_y=300, device="gpu") #HWC
> 
>     frames = fn.transpose(images, perm=[2, 0, 1]) #CHW
>     depths = fn.transpose(depths, perm=[2, 0, 1]) #CHW
>     # frames = fn.normalize(frames, mean=0.0, stddev=255.0, device="gpu") #CHW
> 
>     global1, global1_depth= process_global(frames, depths, indices)
>     # global1_combined = process_global(frames, depths, indices)
>     locals, local_depths = map(list, zip(*[process_local(frames, depths) for _ in range(local_crops_number)]))
>     return global1, global1_depth, *locals, *local_depths
> ERROR] [nvjpeg_cudadecoder] Could not decode jpeg code stream - nvjpeg error [#4](https://github.com/NVIDIA/DALI/pull/4) (Jpeg not supported) when running nvjpegJpegStreamParse(handle, static_cast<const unsigned char*>(stream_ctx->encoded_streamdata), stream_ctx->encoded_stream_datasize, false, false, p.parsestate.nvjpegstream) at /home/jenkins/agent/workspace/nvimagecodec/helpers/release_v0.3.0/Release_11/build/extensions/nvjpeg/cuda_decoder.cpp:472 [WARNING] [nvimgcodec] [nvjpeg_cuda_decoder] decode [#110](https://github.com/NVIDIA/DALI/pull/110) fallback Epoch: [0/100]: 1%|█ | 5/974 [00:59<3:06:05, 11.52s/it][ERROR] [libjpeg_turbo_decoder] Could not decode jpeg code stream - Premature end of JPEG data. Stopped at line 962/2061

But I didn’t encounter any errors when decoding individually?
```python
@pipeline_def(batch_size=80, num_threads=8 , device_id=0 ,enable_conditionals=True)
def VideoPipe(total_picture, file_list, local_crops_number, frame_per_clip):
    # input = fn.readers.file(file_list=file_list, random_shuffle=False)
    depth = fn.readers.file(file_list=file_list, random_shuffle=False)
    # shapes = fn.peek_image_shape(input[0])
    # print(type(shapes))
    # crop_anchor, crop_shape= fn.random_crop_generator(shapes, random_area=[0.2, 1.0])
    # crop_anchor = fn.permute_batch(crop_anchor, indices=batch_size * [0])
    # crop_shape = fn.permute_batch(crop_shape , indices=batch_size * [0])
    # print(crop_shape)

    # init_crop
    num_clips = total_picture // frame_per_clip
    indices = np.concatenate([i * np.ones(frame_per_clip, dtype=int) for i in range(num_clips)])
    indices = indices.tolist()

    crop_anchor, crop_shape= fn.random_crop_generator(np.array([2160, 3840, 3], dtype=np.int64), random_area=[0.2, 1.0])
    crop_anchor = fn.permute_batch(crop_anchor, indices=indices)
    crop_shape = fn.permute_batch(crop_shape , indices=indices)

    # images = fn.decoders.image_slice(input[0], crop_anchor, crop_shape, device="mixed", axis_names="HW") #HWC
    depths = fn.decoders.image_slice(depth[0], crop_anchor, crop_shape, device="mixed", axis_names="HW") #HWC

    # images = fn.resize(images, resize_x=300, resize_y=300, device="gpu") #HWC
    depths = fn.resize(depths, resize_x=300, resize_y=300, device="gpu") #HWC

    # frames = fn.transpose(images, perm=[2, 0, 1]) #CHW
    depths = fn.transpose(depths, perm=[2, 0, 1]) #CHW
    # frames = fn.normalize(frames, mean=0.0, stddev=255.0, device="gpu") #CHW

    # global1, global1_depth= process_global(frames, depths, indices)
    # # global1_combined = process_global(frames, depths, indices)
    # locals, local_depths = map(list, zip(*[process_local(frames, depths) for _ in range(local_crops_number)]))

    return depths 
JanuszL commented 2 weeks ago

Hi @kristinat8,

Can you provide a full error log as it should print the name of the image that caused this error? It seems that one of the images in your data set is corrupted. Can you check if you can open it in any image viewer? If so then it would help us a lot if you could provide the image for our examination, maybe there is a gap in our decoding support.