Lightning-AI / litdata

Transform datasets at scale. Optimize datasets for fast AI model training.
Apache License 2.0
307 stars 36 forks source link

`optimize()` with `num_workers > 1` leads to deletion issues #245

Open awaelchli opened 1 month ago

awaelchli commented 1 month ago

🐛 Bug

In the LitData tests, we only ever call optimize() with num_workers=1. In the PR #237 I found that if optimize is called with more workers, then we get a race condition (??) causing some chunks to be deleted and then streaming fails. https://github.com/Lightning-AI/litdata/pull/237#discussion_r1684570860

This happens in this test: https://github.com/Lightning-AI/litdata/blob/c58b67346a3be22de26679fb6788f38894c47cd1/tests/streaming/test_dataset.py#L826 (see ToDo comments).

The test fails with

__________________ test_dataset_resume_on_future_chunks[True] __________________

shuffle = True
tmpdir = local('/tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0')
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f6a4124f460>

    @pytest.mark.skipif(sys.platform == "win32", reason="Not tested on windows and MacOs")
    @mock.patch.dict(os.environ, {}, clear=True)
    @pytest.mark.timeout(60)
    @pytest.mark.parametrize("shuffle", [True, False])
    def test_dataset_resume_on_future_chunks(shuffle, tmpdir, monkeypatch):
        """This test is constructed to test resuming from a chunk past the first chunk, when subsequent chunks don't have
        the same size."""
        s3_cache_dir = str(tmpdir / "s3cache")
        optimize_data_cache_dir = str(tmpdir / "optimize_data_cache")
        optimize_cache_dir = str(tmpdir / "optimize_cache")
        data_dir = str(tmpdir / "optimized")
        monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", optimize_data_cache_dir)
        monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", optimize_cache_dir)

>       optimize(
            fn=_simple_preprocess,
            inputs=list(range(8)),
            output_dir=data_dir,
            chunk_size=190,
            num_workers=4,
            num_uploaders=1,
copying /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-3-1.bin to /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimized/chunk-3-1.bin
putting /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-3-1.bin on the remove queue
Worker 1 is done.
Worker 2 is done.
Worker 3 is done.
Worker 0 is done.
Workers are finished.
----------------------------- Captured stderr call -----------------------------

Progress:   0%|          | 0/8 [00:00<?, ?it/s]Process Process-85:1:
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 259, in _upload_fn
    shutil.copy(local_filepath, output_filepath)
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/shutil.py", line 427, in copy
    copyfile(src, dst, follow_symlinks=follow_symlinks)
  File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/shutil.py", line 264, in copyfile
    with open(src, 'rb') as fsrc:
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-0-0.bin'

Progress: 100%|██████████| 8/8 [00:00<00:00, 122.77it/s]
=========================== short test summary info ============================
FAILED tests/streaming/test_dataset.py::test_dataset_resume_on_future_chunks[True] - RuntimeError: All the chunks should have been deleted. Found ['chunk-0-1.bin']
====== 1 failed, 191 passed, 8 skipped, 11 warnings in [247](https://github.com/Lightning-AI/litdata/actions/runs/10010459328/job/27671682379?pr=237#step:10:248).94s (0:04:07) =======

when setting optimize(num_workers=4). This needs to be investigated. However, not possible so far to reproduce locally (only observed in CI)!

awaelchli commented 1 month ago

Some more evidence in another (more rare flaky) test that uses num_workers=2:

https://github.com/Lightning-AI/litdata/actions/runs/10013150667/job/27680138130

_______ test_dataset_for_text_tokens_distributed_num_workers_end_to_end ________

tmpdir = local('/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1')
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x121e83bb0>

    def test_dataset_for_text_tokens_distributed_num_workers_end_to_end(tmpdir, monkeypatch):
        monkeypatch.setattr(functions, "_get_input_dir", lambda x: str(tmpdir))

        seed_everything(42)

        with open(tmpdir / "a.txt", "w") as f:
            f.write("hello")

        inputs = [(v, str(tmpdir / "a.txt")) for v in range(0, 200, 20)]

        cache_dir = os.path.join(tmpdir, "cache")
        output_dir = os.path.join(tmpdir, "target_dir")
        os.makedirs(output_dir, exist_ok=True)
        monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", cache_dir)
        monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", cache_dir)

>       functions.optimize(
            optimize_fn, inputs, output_dir=str(tmpdir), num_workers=2, chunk_size=2, reorder_files=False, num_downloaders=1
        )

tests/streaming/test_dataset.py:596: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
src/litdata/processing/functions.py:432: in optimize
    data_processor.run(
src/litdata/processing/data_processor.py:1055: in run
    self._exit_on_error(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <litdata.processing.data_processor.DataProcessor object at 0x121edb6a0>
error = 'Traceback (most recent call last):\n  File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor....te/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache\'\n'

    def _exit_on_error(self, error: str) -> None:
        for w in self.workers:
            # w.join(0)
            w.terminate()  # already error has occurred. So, no benefit of processing further.
>       raise RuntimeError(f"We found the following error {error}.")
E       RuntimeError: We found the following error Traceback (most recent call last):
E         File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 435, in run
E           self._setup()
E         File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 444, in _setup
E           self._create_cache()
E         File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 511, in _create_cache
E           os.makedirs(self.cache_data_dir, exist_ok=True)
E         File "/Users/runner/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/os.py", line 225, in makedirs
E           mkdir(name, mode)
E       FileExistsError: [Errno 17] File exists: '/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache'
E       .

src/litdata/processing/data_processor.py:1119: RuntimeError
----------------------------- Captured stdout call -----------------------------
Create an account on https://lightning.ai/ to optimize your data faster using multiple nodes and large machines.
Storing the files under /private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1
Setup started with fast_dev_run=False.
Setup finished in 0.001 seconds. Found 10 items to process.
Starting 2 workers with 10 items. The progress bar is only updated when a worker finishes.
Workers are ready ! Starting data processing...
Worker 1 is done.
----------------------------- Captured stderr call -----------------------------

Progress:   0%|          | 0/10 [00:00<?, ?it/s]
=========================== short test summary info ============================
FAILED tests/streaming/test_dataset.py::test_dataset_for_text_tokens_distributed_num_workers_end_to_end - RuntimeError: We found the following error Traceback (most recent call last):
  File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 435, in run
    self._setup()
  File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 444, in _setup
    self._create_cache()
  File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 511, in _create_cache
    os.makedirs(self.cache_data_dir, exist_ok=True)
  File "/Users/runner/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/os.py", line 225, in makedirs
    mkdir(name, mode)
FileExistsError: [Errno 17] File exists: '/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache'
.
hiyyg commented 1 month ago

Hi, I also met a similar issue with num_workers > 1. How could we resolve it?

hiyyg commented 1 month ago

I also found the issue happens when num_workers = 1. For now I can only run with num_workers = 0.

BDannowitzHudl commented 1 month ago

I'm also experiencing this issue

[Errno 2] No such file or directory: '/tmp/[...]/chunk-0-0.bin'

This error goes away when I set my num_workers = 1

hiyyg commented 1 month ago

It seems I can bypass this error with

os.environ["DATA_OPTIMIZER_CACHE_FOLDER"] = f"/tmp/{__name__}"
deependujha commented 1 month ago

For the second issue related to test_dataset_for_text_tokens_distributed_num_workers_end_to_end:

I think, this is only a weird bug.

We are using os.makedirs(SOME_PATH, exist_ok=True), so even if the file exists, it shouldn't raise an error. But, sometimes, it raises error.

I don't think it has anything to do with num_workers.

Screenshot from 2024-07-25 22-00-25


I faced this issue couple of times, and from what I remember, it only used to fail in macos. So, I added couple of if os.path.exists(): conditions before calling make_dirs. It is present in number of files.

lxr2 commented 1 month ago

Same issue on my Ubuntu 16 server with num_workers=16. It doesn't always happen, and one way to solve it is to just rerun the code.

image

psesudo code:

from PIL import Image
import os
import litdata as ld

def process_patch(input_data):
    img_patch, mask_patch, color2label = input_data

    img_patch = img_patch.convert("RGB")
    mask_patch = mask_patch.convert("RGB")
    w, h = mask_patch.size
    pixel = mask_patch.getpixel((w//2, h//2))
    label_text = color2label.get(pixel, "BG")

    if label_text == "BG": return None

    label = list(color2label.keys()).index(pixel)

    return (img_patch, label)

for slide_id in slide_ids:
    img_path = slide_id + "_HE.jpg"
    mask_path = slide_id + "_mask.jpg"

    img = Image.open(img_path)
    mask = Image.open(mask_path)
    img_patches = split_image_into_patches(img, patch_size, stride_size)
    mask_patches = split_image_into_patches(mask, patch_size, stride_size)

    input_data = [(img, mask, color2label) for img, mask in zip(img_patches, mask_patches)]

    ld.optimize(
        fn=process_patch,
        inputs=input_data,
        output_dir=os.path.join(patch_dir, slide_id),
        num_workers=min(os.cpu_count(), 16),
        mode='overwrite',
        compression="zstd",
        chunk_bytes="64MB"
    )