pytorch / data

A PyTorch repo for data loading and utilities to be shared by the PyTorch domain libraries.
BSD 3-Clause "New" or "Revised" License
1.13k stars 152 forks source link

running FSSpecFileLister in ikernel doesn't work #497

Open sephib opened 2 years ago

sephib commented 2 years ago

🐛 Describe the bug

Hi This bug is following the conversation on discuss.pytorch.org When running the following code in a jupyter kernel - the fs.protocol is not consistent

To reproduce - there is a need to update the url_to_fs call in /torchdata/datapipes/iter/load/fsspec.py file

   fs, path = fsspec.core.url_to_fs(self.root, token='/Path/to/creds/credentials.json')

then run the following code

from torchdata.datapipes.iter import FSSpecFileLister
image_bucket = "gs://path/to/folder"
datapipe = FSSpecFileLister(root=image_bucket, masks=['*.png'])
file_dp = datapipe.open_file_by_fsspec(mode='rb')
list(file_dp)

in the second time running this code without restarting the kernel the URI returns without the gs:// but with the full path of the environment.

Versions

Collecting environment information... PyTorch version: 1.11.0 Is debug build: False CUDA used to build PyTorch: None ROCM used to build PyTorch: N/A

OS: macOS 12.3.1 (x86_64) GCC version: Could not collect Clang version: 13.1.6 (clang-1316.0.21.2.3) CMake version: Could not collect Libc version: N/A

Python version: 3.9.13 (main, May 24 2022, 21:28:31) [Clang 13.1.6 (clang-1316.0.21.2)] (64-bit runtime) Python platform: macOS-12.3.1-x86_64-i386-64bit Is CUDA available: False CUDA runtime version: No CUDA GPU models and configuration: No CUDA Nvidia driver version: No CUDA cuDNN version: No CUDA HIP runtime version: N/A MIOpen runtime version: N/A Is XNNPACK available: True

Versions of relevant libraries: [pip3] facenet-pytorch==2.5.2 [pip3] mypy-extensions==0.4.3 [pip3] numpy==1.22.4 [pip3] pytorch-ignite==0.4.9 [pip3] torch==1.11.0 [pip3] torchdata==0.3.0 [pip3] torchvision==0.12.0

ejguan commented 2 years ago

Thanks for spotting the issue. Investigating it.

ejguan commented 2 years ago

I have used the same script with a public dataset on GCS, but I can't reproduce this issue. Could you please try to re-install torchdata using the nightly release? pip3 install --pre torch torchdata --extra-index-url https://download.pytorch.org/whl/nightly/cpu

sephib commented 2 years ago

The whole issue is with private datasets and not public

ejguan commented 2 years ago

For the token argument, we added kwargs to FSSpecFileLister. With TorchData 0.4.0 or nightly release, you should be able to add your token there: https://github.com/pytorch/data/blob/f1a128ec789f078852943e8c58377a99b42a7b45/torchdata/datapipes/iter/load/fsspec.py#L57

Based on the discussion on the forum, it seems that there are two issues.

  1. With multiprocessing enabled, your pipeline doesn't return anything.
    
    def build_datapipes(root_dir=image_bucket):
    datapipe = FSSpecFileLister(root=root_dir, masks=['*.png'])
    file_dp = datapipe.open_file_by_fsspec(mode='rb')  
    datapipe = file_dp.map(PIL_open)
    return datapipe

datapipe = build_datapipes() dl = DataLoader(dataset=datapipe, batch_size=1, num_workers=1)

Just want to confirm that you mean the process hangs forever, right? 
3. Re-iterate over your pipeline would raise `FileNotFoundError` in ipython kernel. But, there won't be such a problem by running it as a script..
```py
datapipe = FSSpecFileLister(root=image_bucket, masks=['*.png'])
file_dp = datapipe.open_file_by_fsspec(mode='rb')
list(file_dp)
sephib commented 2 years ago

Just want to confirm that you mean the process hangs forever, right?

Correct

Re-iterate over your pipeline would raise FileNotFoundError in ipython kernel

Correct - but I'm not going to further investigate this issue

drubinstein commented 1 year ago

I have been having a similar issue with gcs. After investigating, I found that multiprocessing+gcsfs/fsspec hanging is a known issue. I've been able to get around it by making my own datapipe that uses the google-cloud-storage python package.

NivekT commented 1 year ago

I have been having a similar issue with gcs. After investigating, I found that multiprocessing+gcsfs/fsspec hanging is a known issue. I've been able to get around it by making my own datapipe that uses the google-cloud-storage python package.

Hi @drubinstein, thanks for letting us known. Just to confirm, that is specific to ikernel as well? Which multiprocessing start method are you using (i.e. spawn, fork, forkserver)?

drubinstein commented 1 year ago

It was an experiment at the time and I didn't try anything but the defaults for DataLoader and DataLoader2 with the MultiprocessingReadingService. I can try to dig up the code and experiment with the other start methods and report back.

This was not specific to ikernel. I was running a script.

drubinstein commented 1 year ago

Ran some tests. Please tell me if there are any other permutations you'd like me to run. ✅ = training ran ❌ = hung

  1. DataLoader + spawn ✅
  2. DataLoader + fork ❌
  3. DataLoader + forkserver ✅
  4. DataLoader2 + default reading service ✅ (also 15% slower than DataLoader)
  5. DataLoader2 + MultiProcessingReadingService + spawn ✅ (also 33% slower than DataLoader)
  6. DataLoader2 + MultiProcessingReadingService + fork ❌
  7. DataLoader2 + MultiProcessingReadingService + forkserver ✅ (also 33% slower than DataLoader)
NivekT commented 1 year ago

It makes sense that fork hangs because of the issue that you linked. However, I do not expect DataLoader2 + MultiprocessingReadingService to be slower since it currently relies on DL1 under the hood.

Can you try the same with PrototypeMultiprocessingReadingService?

Thanks for looking into this!

drubinstein commented 1 year ago
  1. PrototypeMultiprocessingReadingService + spawn ✅
  2. PrototypeMultiprocessingReadingService + fork ❌
  3. PrototypeMultiprocessingReadingService + forkserver ✅

Not sure if it made a difference, but I modified my pipeline to remove a prefetch() call I had at the end of it and yield to the reading service's prefetch addition.

NivekT commented 1 year ago

Thanks for running that! What was the observed performance with PrototypeMultiprocessingReadingService?

drubinstein commented 1 year ago

With spawn, MultiprocessingReadingService was ~2x slower than PrototypeMultiprocessingReadingService assuming I had all the correct arguments.

NivekT commented 1 year ago

Just to confirm, various setups all have the same arguments for shuffle, prefetch, num_workers, and etcs?

We will make a note of the result here. If you have any minimally reproducible script, it would have us reproduce and investigate. Thanks!

drubinstein commented 1 year ago

Yes. They all had the same DataIterPipe passed to the DataLoader. However, there may be arguments in the reading service I haven't accounted for.

drubinstein commented 1 year ago

I want to emphasize that this issue only appears on GCS as far as I know. I believe making a custom GcsFileLoader/GcsFileLister that avoids fsspec and uses google-cloud-storage may avoid the problem. Unfortunately, I deleted my implementation of those when moving to fsspec, but can rewrite them if you want. I can't copy the script exactly, but the shape of it looks like:

data_pipe = (
IterableWrapper([gcs_path])
  .list_files_by_fsspec()
  .open_files_by_fsspec(mode="rb")
  .map(deserialize)
  .cycle()
  .batch()
  .collate()
)

dataset = DataLoader2(
  data_pipe, 
  reading_service=MultiProcessingReadingService(4, multiprocessing_context='spawn')
)

for i, (x, y) in enumerate(dataset): 
  pass
NivekT commented 1 year ago

Thanks for that! I think the performance difference between DataLoader1 and MultiProcessingReadingService is likely due to prefetch_factor (defaults to 2 for former and 0 for latter I believe). Therefore, I won't spend too much time digging into that right now.

The fact that fork doesn't work also makes sense given the linked issue. I think we should keep a note of that.

I am curious what your custom GCSFileLoader looks like? We maybe open to add that to our library.

drubinstein commented 1 year ago

It was a copy of s3io.py but used google-cloud-storage instead of the S3Handler. Chose the python lib so I could avoid requiring a custom build.

drubinstein commented 1 year ago

Just found it in my git history actually! As you can tell, it's a bit unpolished :). I'm gonna run it in the meantime to see if it works.

Code ```python3 # Forked from torchdata's s3io.py import os from io import BytesIO from typing import Iterator, Tuple from google.cloud import storage import torchdata from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe from torchdata.datapipes.utils import StreamWrapper @functional_datapipe("list_files_by_gcs") class GCSFileListerIterDataPipe(IterDataPipe[str]): r""" Iterable DataPipe that lists GCS file URLs with the given prefixes (functional name: ``list_files_by_gcs``). Acceptable prefixes include ``gs://bucket-name``, ``gs://bucket-name/``, ``gs://bucket-name/folder``. Note: 1. ``source_datapipe`` **must** contain a list of valid GCS URLs 2. ``length`` is `-1` by default, and any call to ``__len__()`` is invalid, because the length is unknown until all files are iterated. Args: source_datapipe: a DataPipe that contains URLs/URL prefixes to s3 files length: Nominal length of the datapipe """ def __init__(self, source_datapipe: IterDataPipe[str], length: int = -1, request_timeout_ms=-1, region="") -> None: self.source_datapipe: IterDataPipe[str] = source_datapipe self.length: int = length def __iter__(self) -> Iterator[str]: client = storage.Client() for prefix in self.source_datapipe: while True: urls = client.list_blobs(prefix) for url in urls: yield url.path() def __len__(self) -> int: if self.length == -1: raise TypeError(f"{type(self).__name__} instance doesn't have valid length") return self.length @functional_datapipe("load_files_by_gcs") class GCSFileLoaderIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]): r""" Iterable DataPipe that loads GCS files from the given S3 URLs (functional name: ``load_files_by_gcs``). ``GCSFileLoader`` iterates all given GCS URLs in ``BytesIO`` format with ``(url, BytesIO)`` tuples. Note: 1. ``source_datapipe`` **must** contain a list of valid S3 URLs. Args: source_datapipe: a DataPipe that contains URLs to s3 files """ def __init__( self, source_datapipe: IterDataPipe[str], ) -> None: self.source_datapipe: IterDataPipe[str] = source_datapipe def __iter__(self) -> Iterator[Tuple[str, StreamWrapper]]: client = storage.Client() for url in self.source_datapipe: bucket_name = os.path.split(url)[2] blob_name = bucket = os.path.split(url)[3:] bucket = client.get_bucket(bucket_name) with bucket.blob(blob_name).open("rb") as blob: yield blob.read() def __len__(self) -> int: return len(self.source_datapipe) ```
NivekT commented 1 year ago

Thank you so much for sharing!

drubinstein commented 1 year ago

Pasting an updated script below. The one above had a few bugs in it and what I'm pasting below does work with GCS + a fork multiprocessing context. I can PR it separately if you want and apply any changes you request.

Code ```python # Forked from torchdata's s3io.py import os from io import BytesIO from typing import Iterator, Tuple from google.cloud import storage import torchdata from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe from torchdata.datapipes.utils import StreamWrapper @functional_datapipe("list_files_by_gcs") class GCSFileListerIterDataPipe(IterDataPipe[str]): r""" Iterable DataPipe that lists GCS file URLs with the given prefixes (functional name: ``list_files_by_gcs``). Acceptable prefixes include ``gs://bucket-name``, ``gs://bucket-name/``, ``gs://bucket-name/folder``. Note: 1. ``source_datapipe`` **must** contain a list of valid GCS URIs 2. ``length`` is `-1` by default, and any call to ``__len__()`` is invalid, because the length is unknown until all files are iterated. Args: source_datapipe: a DataPipe that contains URI prefixes to GCS files length: Nominal length of the datapipe """ def __init__(self, source_datapipe: IterDataPipe[str], length: int = -1) -> None: self.source_datapipe: IterDataPipe[str] = source_datapipe self.length: int = length def __iter__(self) -> Iterator[str]: client = storage.Client() for path in self.source_datapipe: path_parts = path.split("/") bucket_name = path_parts[2] prefix = "/".join(path_parts[3:]) for blob in client.list_blobs(bucket_name, prefix=prefix): yield "gs://" + os.path.join(bucket_name, blob.name) def __len__(self) -> int: if self.length == -1: raise TypeError(f"{type(self).__name__} instance doesn't have valid length") return self.length @functional_datapipe("load_files_by_gcs") class GCSFileLoaderIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]): r""" Iterable DataPipe that loads GCS files from the given S3 URLs (functional name: ``load_files_by_gcs``). ``GCSFileLoader`` iterates all given GCS URLs in ``BytesIO`` format with ``(url, BytesIO)`` tuples. Note: 1. ``source_datapipe`` **must** contain a list of valid GCS URIs. Args: source_datapipe: a DataPipe that contains GCS URIs to GCS files """ def __init__( self, source_datapipe: IterDataPipe[str], ) -> None: self.source_datapipe: IterDataPipe[str] = source_datapipe def __iter__(self) -> Iterator[Tuple[str, StreamWrapper]]: client = storage.Client() for url in self.source_datapipe: path_parts = url.split("/") bucket_name = path_parts[2] blob_name = "/".join(path_parts[3:]) bucket = client.get_bucket(bucket_name) with bucket.blob(blob_name).open("rb") as blob: yield url, blob def __len__(self) -> int: return len(self.source_datapipe) ```