pytorch / data

A PyTorch repo for data loading and utilities to be shared by the PyTorch domain libraries.
BSD 3-Clause "New" or "Revised" License
1.13k stars 151 forks source link

Race condition when prefetching forks #1162

Open falckt opened 1 year ago

falckt commented 1 year ago

🐛 Describe the bug

When running prefetch on multiple branches of a forked datapipe, it is possible to trigger a race condition.

import time

import torchdata.datapipes as dp
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService

def expensive_op(item):
    time.sleep(1)
    return item

fork1, fork2 = (
    dp.iter.IterableWrapper(range(20))
    .map(expensive_op)
    .fork(2)
)
fork1 = fork1.prefetch(2)
fork2 = fork2.prefetch(2)

dl = DataLoader2(
    fork1.zip(fork2),
    reading_service=MultiProcessingReadingService(4),
)

for _ in dl:
    pass

Traceback

Traceback (most recent call last):
  File "/mwes/prefetch.py", line 22, in <module>
    for _ in dl:
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/dataloader2.py", line 53, in __next__
    next_val = next(self.dataloader._datapipe_iter)  # type: ignore[arg-type]
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 345, in __iter__
    response.exc.reraise()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/_utils.py", line 52, in reraise
    raise exception
ValueError: Caught ValueError in worker process 0.
Original Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 211, in DataPipeBehindQueues
    value = source_datapipe.nonblocking_next()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 103, in nonblocking_next
    return next(self._as_iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 624, in __iter__
    yield from zip(*iterators)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 154, in __next__
    return self._get_next()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 142, in _get_next
    result = next(self.iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 196, in get_next_element_by_instance
    return_val = next(self._datapipe_iterator)  # type: ignore[arg-type]
ValueError: generator already executing
This exception is thrown by __iter__ of PrefetcherIterDataPipe(buffer_size=2, source_datapipe=_ChildDataPipe)

root@1a29f8e4775c:/workspace# python /mwes/prefetch.py
Traceback (most recent call last):
  File "/mwes/prefetch.py", line 22, in <module>
    for _ in dl:
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/dataloader2.py", line 53, in __next__
    next_val = next(self.dataloader._datapipe_iter)  # type: ignore[arg-type]
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 345, in __iter__
    response.exc.reraise()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/_utils.py", line 52, in reraise
    raise exception
ValueError: Caught ValueError in worker process 0.
Original Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 211, in DataPipeBehindQueues
    value = source_datapipe.nonblocking_next()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 103, in nonblocking_next
    return next(self._as_iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 624, in __iter__
    yield from zip(*iterators)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 154, in __next__
    return self._get_next()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 142, in _get_next
    result = next(self.iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 196, in get_next_element_by_instance
    return_val = next(self._datapipe_iterator)  # type: ignore[arg-type]
ValueError: generator already executing
This exception is thrown by __iter__ of PrefetcherIterDataPipe(buffer_size=2, source_datapipe=_ChildDataPipe)

root@1a29f8e4775c:/workspace# python /mwes/prefetch.py
Traceback (most recent call last):
  File "/mwes/prefetch.py", line 22, in <module>
    for _ in dl:
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/dataloader2.py", line 53, in __next__
    next_val = next(self.dataloader._datapipe_iter)  # type: ignore[arg-type]
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 345, in __iter__
    response.exc.reraise()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/_utils.py", line 52, in reraise
    raise exception
ValueError: Caught ValueError in worker process 0.
Original Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 211, in DataPipeBehindQueues
    value = source_datapipe.nonblocking_next()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 103, in nonblocking_next
    return next(self._as_iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 624, in __iter__
    yield from zip(*iterators)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 154, in __next__
    return self._get_next()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 142, in _get_next
    result = next(self.iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 196, in get_next_element_by_instance
    return_val = next(self._datapipe_iterator)  # type: ignore[arg-type]
ValueError: generator already executing
This exception is thrown by __iter__ of PrefetcherIterDataPipe(buffer_size=2, source_datapipe=_ChildDataPipe)

root@1a29f8e4775c:/workspace# python /mwes/prefetch.py
Traceback (most recent call last):
  File "/mwes/prefetch.py", line 28, in <module>
    for _ in dl:
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/dataloader2.py", line 53, in __next__
    next_val = next(self.dataloader._datapipe_iter)  # type: ignore[arg-type]
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 345, in __iter__
    response.exc.reraise()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/_utils.py", line 52, in reraise
    raise exception
ValueError: Caught ValueError in worker process 0.
Original Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 211, in DataPipeBehindQueues
    value = source_datapipe.nonblocking_next()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 103, in nonblocking_next
    return next(self._as_iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 624, in __iter__
    yield from zip(*iterators)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 154, in __next__
    return self._get_next()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 142, in _get_next
    result = next(self.iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 196, in get_next_element_by_instance
    return_val = next(self._datapipe_iterator)  # type: ignore[arg-type]
ValueError: generator already executing
This exception is thrown by __iter__ of PrefetcherIterDataPipe(buffer_size=2, source_datapipe=_ChildDataPipe)

Versions

Test run in pytorch-nightly docker image. (Also fails on pytorch 2.0 with torchdata 0.6.)

PyTorch version: 2.1.0.dev20230514 Is debug build: False CUDA used to build PyTorch: 11.7 ROCM used to build PyTorch: N/A

OS: Ubuntu 20.04.6 LTS (x86_64) GCC version: Could not collect Clang version: Could not collect CMake version: version 3.22.1 Libc version: glibc-2.31

Python version: 3.10.11 (main, Apr 20 2023, 19:02:41) [GCC 11.2.0] (64-bit runtime) Python platform: Linux-5.19.0-1024-aws-x86_64-with-glibc2.31 Is CUDA available: False CUDA runtime version: No CUDA CUDA_MODULE_LOADING set to: N/A GPU models and configuration: No CUDA Nvidia driver version: No CUDA cuDNN version: No CUDA HIP runtime version: N/A MIOpen runtime version: N/A Is XNNPACK available: True

CPU: Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian Address sizes: 48 bits physical, 48 bits virtual CPU(s): 64 On-line CPU(s) list: 0-63 Thread(s) per core: 2 Core(s) per socket: 32 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 49 Model name: AMD EPYC 7R32 Stepping: 0 CPU MHz: 2799.972 BogoMIPS: 5599.94 Hypervisor vendor: KVM Virtualization type: full L1d cache: 1 MiB L1i cache: 1 MiB L2 cache: 16 MiB L3 cache: 128 MiB NUMA node0 CPU(s): 0-63 Vulnerability Itlb multihit: Not affected Vulnerability L1tf: Not affected Vulnerability Mds: Not affected Vulnerability Meltdown: Not affected Vulnerability Mmio stale data: Not affected Vulnerability Retbleed: Mitigation; untrained return thunk; SMT enabled with STIBP protection Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization Vulnerability Spectre v2: Mitigation; Retpolines, IBPB conditional, STIBP always-on, RSB filling, PBRSB-eIBRS Not affected Vulnerability Srbds: Not affected Vulnerability Tsx async abort: Not affected Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid aperfmperf tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a misalignsse 3dnowprefetch topoext ssbd ibrs ibpb stibp vmmcall fsgsbase bmi1 avx2 smep bmi2 rdseed adx smap clflushopt clwb sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr rdpru wbnoinvd arat npt nrip_save rdpid

Versions of relevant libraries: [pip3] numpy==1.24.3 [pip3] torch==2.1.0.dev20230514 [pip3] torchaudio==2.1.0.dev20230514 [pip3] torchdata==0.7.0.dev20230514 [pip3] torchelastic==0.2.2 [pip3] torchtext==0.16.0.dev20230514 [pip3] torchvision==0.16.0.dev20230514 [pip3] triton==2.1.0 [conda] blas 1.0 mkl [conda] mkl 2023.1.0 h6d00ec8_46342 [conda] mkl-service 2.4.0 py310h5eee18b_1 [conda] mkl_fft 1.3.6 py310h1128e8f_1 [conda] mkl_random 1.2.2 py310h1128e8f_1 [conda] numpy 1.24.3 py310h5f9d8c6_1 [conda] numpy-base 1.24.3 py310hb5e798b_1 [conda] pytorch 2.1.0.dev20230514 py3.10_cuda11.7_cudnn8.5.0_0 pytorch-nightly [conda] pytorch-cuda 11.7 h778d358_5 pytorch-nightly [conda] pytorch-mutex 1.0 cuda pytorch-nightly [conda] torchaudio 2.1.0.dev20230514 py310_cu117 pytorch-nightly [conda] torchdata 0.7.0.dev20230514 py310 pytorch-nightly [conda] torchelastic 0.2.2 pypi_0 pypi [conda] torchtext 0.16.0.dev20230514 py310 pytorch-nightly [conda] torchtriton 2.1.0+7d1a95b046 py310 pytorch-nightly [conda] torchvision 0.16.0.dev20230514 py310_cu117 pytorch-nightly

ejguan commented 1 year ago

Thanks for reporting it. I can confirm this racing condition from fork. To unblock your use case, can you pls do

dp = fork1.zip(fork2)
dp = dp.prefetch(2)

IMHO, It's generally caused by fork takes a single reference of the generator from prior DataPipe. And, it can be resolved by making _ForkerIterDataPipe.get_next_element_by_instance a normal function to return next value rather than generator function, and making _ChildDataPipe.__iter__ do a while-loop to call get_next_element_by_instance once per iteration.