Lightning-AI / litdata

Streamline data pipelines for AI. Process datasets across 1000s of machines, and optimize data for blazing fast model training.
Apache License 2.0
249 stars 24 forks source link

Add support for Mosaic Streaming WDS data format #195

Open tchaton opened 3 days ago

tchaton commented 3 days ago

🚀 Feature

Motivation

The format are similar and it would be nice to be able to use a Mosaic streaming dataset with LitData.

Pitch

Alternatives

Additional context

bhimrazy commented 5 hours ago

Hi @tchaton,

I wanted to learn more about this feature to see if I could contribute. Is it related to making the StreamingDataLoader compatible with the StreamingDataset from Mosaic?

Or is it about converting the StreamingDataset instance from Mosaic to litdata and then consuming it further, or is it something else?

Thank you!

tchaton commented 3 hours ago

Hey @bhimrazy,

Great question. Here is what we want to achieve.

Generate a MDS dataset with Mosaic ML

import numpy as np
from PIL import Image
from streaming import MDSWriter

# Local or remote directory in which to store the compressed output files
data_dir = 'path-to-dataset'

# A dictionary mapping input fields to their data types
columns = {
    'image': 'jpeg',
    'class': 'int'
}

# Shard compression, if any
compression = 'zstd'

# Save the samples as shards using MDSWriter
with MDSWriter(out=data_dir, columns=columns, compression=compression) as out:
    for i in range(10000):
        sample = {
            'image': Image.fromarray(np.random.randint(0, 256, (32, 32, 3), np.uint8)),
            'class': np.random.randint(10),
        }
        out.write(sample)

And enable to read it with LitData.

from litdata import StreamingDataset, StreamingDataloder

# Remote path where full dataset is persistently stored
input_dir = 's3://my-bucket/path-to-dataset'

# Create streaming dataset
dataset = StreamingDataset(input_dir=input_dir, shuffle=True)

# Let's see what is in sample #1337...
sample = dataset[1337]
img = sample['image']
cls = sample['class']

# Create PyTorch DataLoader
dataloader = DataLoader(dataset)

We have roughly the same logic to encode & decode the chunks (different serializers). The main difference comes from the way the StreamingDataset handles pre-fetching the data.

We can achieve this by detecting whether the provided config is from mosaic ML and then adapting the deserializers to theirs.

tchaton commented 2 hours ago

And @bhimrazy, if you have to copy any code from Mosaic Streaming repo, let's make sure to include full credits and the reason why the code is copied over.

bhimrazy commented 2 hours ago

Sure, @tchaton. Thanks for providing the context here. I'll go through it.

tchaton commented 59 minutes ago

Awesome @bhimrazy. Keep me updated if you are blocked.