Lightning-AI / litdata

Streamline data pipelines for AI. Process datasets across 1000s of machines, and optimize data for blazing fast model training.
Apache License 2.0
249 stars 24 forks source link

Feat: Append data to pre-optimize dataset #184

Closed deependujha closed 4 days ago

deependujha commented 5 days ago
Before submitting - [x] Was this discussed/agreed via a Github issue? (no need for typos and docs improvements) - [ ] Did you read the [contributor guideline](https://github.com/Lightning-AI/lit-data/blob/main/.github/CONTRIBUTING.md), Pull Request section? - [ ] Did you make sure to update the docs? - [x] Did you write any new necessary tests?

What does this PR do?

Fixes #23 .

Append or Overwrite to pre-optimized dataset.

Expand here for a testcase that conveys the feature best: ```python def test_optimize(tmpdir): output_dir = str(tmpdir / "output_dir") # or s3 URI def compress(index): return index, index**2 def different_compress(index): return index, index**2, index**3 optimize( fn=compress, inputs=list(range(100)), num_workers=1, output_dir=output_dir, chunk_bytes="64MB", ) ds = StreamingDataset(output_dir) assert len(ds) == 100 assert ds[:] == [(i, i**2) for i in range(100)] with pytest.raises(RuntimeError, match="HINT: If you want to append/overwrite to the existing dataset"): optimize( fn=compress, inputs=list(range(100, 200)), num_workers=1, output_dir=output_dir, chunk_bytes="64MB", ) with pytest.raises(ValueError, match="The provided `mode` should be either `append` or `overwrite`"): optimize( fn=compress, inputs=list(range(100, 200)), num_workers=1, output_dir=output_dir, chunk_bytes="64MB", mode="some-random-mode", ) optimize( fn=compress, inputs=list(range(100, 200)), num_workers=3, output_dir=output_dir, chunk_bytes="64MB", mode="overwrite", ) ds = StreamingDataset(output_dir) assert len(ds) == 100 assert ds[:] == [(i, i**2) for i in range(100, 200)] optimize( fn=compress, inputs=list(range(200, 300)), num_workers=os.cpu_count(), output_dir=output_dir, chunk_bytes="64MB", mode="append", ) ds = StreamingDataset(output_dir) assert len(ds) == 200 assert ds[:] == [(i, i**2) for i in range(100, 300)] optimize( fn=compress, inputs=list(range(300, 400)), num_workers=2, output_dir=output_dir, chunk_bytes="64MB", mode="append", ) ds = StreamingDataset(output_dir) assert len(ds) == 300 assert ds[:] == [(i, i**2) for i in range(100, 400)] with pytest.raises(Exception, match="The config isn't consistent between chunks"): optimize( fn=different_compress, inputs=list(range(100, 200)), num_workers=1, output_dir=output_dir, chunk_bytes="64MB", mode="append", ) ds = StreamingDataset(output_dir) assert len(ds) == 300 assert ds[:] == [(i, i**2) for i in range(100, 400)] optimize( fn=different_compress, inputs=list(range(800, 900)), num_workers=1, output_dir=output_dir, chunk_bytes="64MB", mode="overwrite", ) ds = StreamingDataset(output_dir) assert len(ds) == 100 assert ds[:] == [(i, i**2, i**3) for i in range(800, 900)] ```

Above testcase is also present at litdata/tests/processing/test_functions.py:48

PR review

Anyone in the community is free to review the PR once the tests have passed. If we didn't discuss your PR in GitHub issues there's a high chance it will not be merged.

Did you have fun?

Make sure you had fun coding 🙃

codecov[bot] commented 4 days ago

Codecov Report

Attention: Patch coverage is 76.47059% with 20 lines in your changes missing coverage. Please review.

Please upload report for BASE (main@a8f33df). Learn more about missing BASE report.

Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #184 +/- ## ===================================== Coverage ? 78% ===================================== Files ? 33 Lines ? 4399 Branches ? 0 ===================================== Hits ? 3414 Misses ? 985 Partials ? 0 ```