pytorch / data

A PyTorch repo for data loading and utilities to be shared by the PyTorch domain libraries.
BSD 3-Clause "New" or "Revised" License
1.11k stars 149 forks source link

CacheHolder buffer size limited to 1000 #781

Open rehno-lindeque opened 1 year ago

rehno-lindeque commented 1 year ago

🐛 Describe the bug

The .on_disk_cache() data pipe uses .demux() under the hood with a default buffer_size of 1000.

Unfortunately this appears to break when the source datapipe has more than 1000 elements. See https://github.com/pytorch/data/blob/v0.4.1/torchdata/datapipes/iter/util/cacheholder.py#L251-L260

For example, to reproduce something like this should show the problem:

import tempfile

temp_dir = tempfile.gettempdir()
caching_datapipe = datapipes.iter.IterableWrapper(
    range(1001)
)
caching_datapipe = caching_datapipe.on_disk_cache(
    filepath_fn=lambda i: f"{temp_dir}/{i}.tmp"
)

# caching_datapipe = caching_datapipe.map(fn=_get_from_http) # (not needed for reproducing)

caching_datapipe = caching_datapipe.end_caching(
    filepath_fn=lambda i: f"{temp_dir}/{i}.tmp"
)

for element in caching_datapipe:
    print(element)

The stack trace ends with:

line 357, in _find_next
    raise BufferError(
BufferError: DemultiplexerIterDataPipe buffer overflow, buffer size 1000 is insufficient.
This exception is thrown by __iter__ of MapperIterDataPipe(datapipe=_ChildDataPipe, fn=<lambda>, input_col=None, output_col=None)

(I haven't been able to find a suitable work-around for this since the demux is hidden inside the implementation of OnDiskCacheHolder. Any ideas/advice for temporarily working around the issue would also be highly appreciated)

Versions

Collecting environment information...
PyTorch version: 1.12.1+cu116
Is debug build: False
CUDA used to build PyTorch: 11.6
ROCM used to build PyTorch: N/A

OS: NixOS 22.05 (Quokka) (x86_64)
GCC version: (GCC) 11.3.0
Clang version: Could not collect
CMake version: Could not collect
Libc version: glibc-2.35

Python version: 3.10.6 (main, Aug  1 2022, 20:38:21) [GCC 11.3.0] (64-bit runtime)
Python platform: Linux-5.18.19-x86_64-with-glibc2.35
Is CUDA available: True
CUDA runtime version: Could not collect
GPU models and configuration: GPU 0: NVIDIA GeForce RTX 3060
Nvidia driver version: 515.48.07
cuDNN version: Could not collect
HIP runtime version: N/A
MIOpen runtime version: N/A
Is XNNPACK available: True

Versions of relevant libraries:
[pip3] mypy==0.971
[pip3] mypy-extensions==0.4.3
[pip3] numpy==1.23.1
[pip3] torch==1.12.1+cu116
[pip3] torchdata==0.4.1
[pip3] torchvision==0.13.1+cu116
[conda] Could not collect
ejguan commented 1 year ago

Thank you for opening the issue. The reason that demux's buffer blows up because we will yield cached data first then todo. See: https://github.com/pytorch/data/blob/983e87ada583b7a58d13a1a5f047dd9d256155dd/torchdata/datapipes/iter/util/cacheholder.py#L424

However, it seems weird to me to change the order to yield data from todo then cached. A proper solution might be adding an argument to end_caching for buffer_size.

rehno-lindeque commented 1 year ago

Personally I think I would expect caching to be FIFO with respect to the source datapipe, in order to be as deterministic as possible.

In other words, if the on-disk cache can cause data in the pipe to be re-ordered as a side-effect, then surely this harms reproducibility. (I.e. a previous run can affect the outcome of the current run)

(Instead of demux, maybe a sum type is needed to properly retain the original ordering of the source datapipe? EDIT: I suppose this would require some kind of filtered "view" of the pipe to maintain the current API...)

ejguan commented 1 year ago

Personally I think I would expect caching to be FIFO with respect to the source datapipe, in order to be as deterministic as possible.

Agree. Will take a look later today

rehno-lindeque commented 1 year ago

Thank you, I appreciate the support!

ejguan commented 1 year ago

Did a quick research and it looks doable by adding a DataPipe to join two DataPipe with a key_fn should do the job. And, it does require some changes in CacheHolder. And, I need to discuss with other users about the usage of such kind of DataPipe to consolidate the API.

Modexus commented 1 year ago

I just ran into this issue. Setting a larger buffer_size fixes it for me atm. Is there a better way to solve this currently?

josiahls commented 1 year ago

Caching files from aws using the S3 loader. I get the same buffer issue. I use a patch function from fastcore to fix the issue temporarily.

@patch
def _end_caching(self:dp.iter.OnDiskCacheHolder):
    filepath_fn, hash_dict, hash_type, extra_check_fn = dp.iter.OnDiskCacheHolder._temp_dict.pop(self)

    todo_dp, cached_dp = self.source_datapipe.demux(
        2,
        partial(
            dp.iter.OnDiskCacheHolder._cache_check_fn,
            filepath_fn=filepath_fn,
            hash_dict=hash_dict,
            hash_type=hash_type,
            extra_check_fn=extra_check_fn,
        ),
        buffer_size = -1
    )
    # Cached: keep filepath(s)
    cached_dp = cached_dp.map(fn=filepath_fn)
    # Convert list back to single elements
    cached_dp = cached_dp.unbatch(-1)

    self.source_datapipe = todo_dp
    self._end_caching_flag = True
    return cached_dp

Alternatively, i noticed I can do:

    pipe = dp.iter.S3FileLister(pipe).header(100)
    pipe = dp.iter.OnDiskCacheHolder(pipe,filepath_fn=save_path)
    pipe = dp.iter.S3FileLoader(pipe)
    pipe = dp.iter.EndOnDiskCacheHolder(pipe,mode="wb",filepath_fn=save_path)

And it will progressively download more files every epoch, but the first way is probably way better.