NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.01k stars 610 forks source link

external_source doesn't return last incomplete batch #5199

Open JoostvDoorn opened 8 months ago

JoostvDoorn commented 8 months ago

Version

1.31.0

Describe the bug.

An fn.external_source with a callable doesn't seem to return the last batch. I have tried using LastBatchPolicy.PARTIAL, but it seems that the last_batch_policy has no effect on external_source. Is there any way to ensure the last batch is always returned? It seems that this is a bug.

Minimum reproducible example

import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy

batch_size = 32

class ExternalInputCallable:
    def __init__(self, batch_size):
        self.batch_size = batch_size
        self.length = 100
        self.full_iterations = self.length // batch_size

    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_idx >= self.length:
            print("STOPPED", sample_idx)
            # Indicate end of the epoch
            raise StopIteration
        return np.array([1], dtype=np.int32), np.array([sample_idx], dtype=np.int32)

@pipeline_def(batch_size=batch_size, num_threads=1, device_id=0)
def callable_pipeline():
    data, label = fn.external_source(source=ExternalInputCallable(batch_size), num_outputs=2, batch=False,
                                       dtype=[types.INT32, types.INT32])
    return data, label

call_pipe = callable_pipeline()
call_pipe.build()

loader = DALIGenericIterator(
    call_pipe,
    last_batch_policy=LastBatchPolicy.PARTIAL,
    output_map=['data', 'label'],
    auto_reset=True,
)
max_label = -1
for batch in loader:
    max_label = max(batch[0]["label"].flatten().max().item(), max_label)
    print(max_label, batch[0]["label"].shape)
print(max_label)

We expect max label to return 99.

Relevant log output

31 torch.Size([32, 1])
STOPPED 100
63 torch.Size([32, 1])
95 torch.Size([32, 1])
95

Other/Misc.

> pip show nvidia-dali-cuda120
Name: nvidia-dali-cuda120
Version: 1.31.0
Summary: NVIDIA DALI  for CUDA 12.0. Git SHA: 166a2e445992d4a0fca41be32ef897ea0a526a6e
Home-page: https://github.com/NVIDIA/dali
Author: NVIDIA Corporation
Author-email: 
License: Apache License 2.0

Check for duplicates

awolant commented 7 months ago

Hello @JoostvDoorn, thank you for creating the issue and thanks for the repro you provided.

The behavior you are seeing is not controlled by the DALIGenericIterator parameters. That's why you see now difference in the output when you change them. You could remove the iterator layer from your repro you would get the same result:

import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types

batch_size = 32

class ExternalInputCallable:
    def __init__(self, batch_size):
        self.batch_size = batch_size
        self.length = 100
        self.full_iterations = self.length // batch_size

    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_idx >= self.length:
            print("STOPPED", sample_idx)
            # Indicate end of the epoch
            raise StopIteration
        return np.array([1], dtype=np.int32), np.array([sample_idx], dtype=np.int32)

@pipeline_def(batch_size=batch_size, num_threads=1, device_id=0)
def callable_pipeline():
    data, label = fn.external_source(source=ExternalInputCallable(batch_size), num_outputs=2, batch=False,
                                       dtype=[types.INT32, types.INT32])
    return data, label

call_pipe = callable_pipeline()
call_pipe.build()

max_label = -1
for i in range(10):
    batch = call_pipe.run()
    max_label = max(batch[1].as_array().flatten().max().item(), max_label)

    print(max_label)

Output:

31
63
STOPPED 100
95
Traceback (most recent call last):
  File "/home/awolant/Projects/DALI/dev/issue/5199.py", line 46, in <module>
    batch = call_pipe.run()
  File "/home/awolant/.local/lib/python3.10/site-packages/nvidia/dali/pipeline.py", line 1176, in run
    return self.outputs()
  File "/home/awolant/.local/lib/python3.10/site-packages/nvidia/dali/pipeline.py", line 1015, in outputs
    raise StopIteration
StopIteration

What happens here is controlled by the pipeline and ExternalInputCallable you created. Note, how it is configured:

self.batch_size = batch_size
self.length = 100
self.full_iterations = self.length // batch_size

In your example, with batch_size == 32 you get full_iterations == 3. That is why your pipeline returns only 3 iterations and you loose samples beyond. So to make this work as you intend you need to support it in ExternalInputCallable. With current implementation all DALI facilities see full_iterations == 3 and try to act accordingly which in conclusion gives the result that we see.

Just one more note on this: if you plan to use the parallel external source for your use case, you have to implement batch padding rather than partial batch as it is a requirement to make it work in parallel and is mentioned in Parallel External Source docs

Hope that helps

JoostvDoorn commented 7 months ago

In your example, with batch_size == 32 you get full_iterations == 3. That is why your pipeline returns only 3 iterations and you loose samples beyond. So to make this work as you intend you need to support it in ExternalInputCallable. With current implementation all DALI facilities see full_iterations == 3 and try to act accordingly which in conclusion gives the result that we see.

I don't actually use the full_iterations, I use the self.length to raise the StopIteration if the length is exceeded. But I see your point that it is documented in the Parallel External Source docs that this is a restrictions, I didn't see this in the specific API docs, so wasn't able to find out this behavior. I suppose what I should do is extend it with a padding, and mask output to be able to return partial batches. It would seem reasonable to support this on the library side since the sample_info anyway is produced by the library it should be able to control the partial batch generation.

awolant commented 7 months ago

Right, I misread the code, apologies.

I agree, it would be good to support this. In the meantime I can extend the docs of source argument to include this information.

mrclovvn commented 5 months ago

Hi~

I've encountered the same issue with the fn.external_source callable not returning the last batch. I've also tried using LastBatchPolicy.PARTIAL, but it seems to have no effect on external_source. Have you found a solution to this problem, or is there an alternative workaround? I look forward to your response.

Best regards.

JanuszL commented 5 months ago

Hi @mrclovvn,

As @awolant stated, the behavior is up to the external_source itself. So your callable should control this, or in parallel mode, you need to pad samples on your own without the ability to return only partial results. We have an appropriate improvement on our ToDo list.