mosaicml / streaming

A Data Streaming Library for Efficient Neural Network Training
https://streaming.docs.mosaicml.com
Apache License 2.0
1.14k stars 142 forks source link

MDS: standard ? #164

Open rom1504 opened 1 year ago

rom1504 commented 1 year ago

Hi, Thanks for the lib!

I think the 2 main alternatives in the pytorch world are webdataset and torch data. They both support tar files as shard format. The benefit of tar is that it's standard and supported by hundred of tools.

You decided to go instead with your own custom format for sharding. Do you have some pointers as to

  1. why is that format better?
  2. What is that format exactly? Is there a spec?
  3. What are some ways to work with MDS files outside of this streamer library?

I noticed that you used img2dataset for downloading laion400m, great! Maybe we could add support to WDS file type directly in img2dataset to avoid having to do a (costly) conversion after the fact.

knighton commented 1 year ago

Thanks for reaching out!

With massive datasets, our serialization format choices are critical to ultimate observed performance of the system. If we really care about performance, we must own the format ourselves. Definitely tar is the general way to pack many blobs into a download-friendly form. But our use case, ML training, is much more specific: we also need extremely low latency cold random access on the granularity of individual samples, which all have the same fields.

Given those requirements, the high-level design: we have an index file and many shard files. Each shard file is a standalone dataset. The index file is a small JSON listing of shard metadata, which include the filename and how many samples the shard has, which concatenates the shards into a big dataset. A dataset is conceptualized as a table, of which each row is a sample. Columns have data types, which deserialize into int, str, PIL Image, etc.

For scalability, the shards are never loaded into memory; instead we read samples on the fly directly from disk. We store the byte offset where each sample is found at the head of the file as an int array. In the body of the file, samples are stored contiguously for fast random access with unbuffered i/o. In-between the two is a "neck" containing a tiny JSON dict of shard metadata (like column names and data types) which is skipped over. In the event the index is lost, it can be recovered using that data in the neck (this has happened).

The index (list of shard metadata) is kept in memory, including a dynamically generated O(1) sample ID range to shard lookup table. On StreamingDataset.__getitem__, we turn that global sample index into a shard and sample index within that shard, then seek directly to the sample (begin, end) in the file header, then seek to the sample data. This scheme of having shard metadata dicts in the index allows us to have other bells and whistles around compression and data integrity and so on, but the core thing is the very fast "two seek" random access.

The downside of the precomputed offsets is that once written, shards are set in stone. But there is nothing wrong with writing a new shard. A common idiom is to walk the samples of an existing dataset, doing some processing or filtering, and writing those samples as a new dataset. Recall that the index is in simple, hackable JSON; by modifying this listing one can subset, concatenate, etc. these datasets.

Why is that format better?

Performance! How does this script fare on your system? https://github.com/mosaicml/streaming/blob/main/scripts/serialization/compare.py

For features, see recent blog post https://www.mosaicml.com/blog/mosaicml-streamingdataset if you haven't.

If you don't like MDS, you can use other formats instead like JSONL or CSV, and StreamingDataset will merrily take your JSONL or CSV shards. The catch is that it will have to pair each JSONL/CSV file with an additional file that we generate containing precise sample byte offset info exactly as in the header/"neck" portion as described above for it to work. We haven't made much use of this instead of MDS as of yet, but the option is there.

What is that format exactly? Is there a spec?

Sorry, do you have some good examples of specs in mind that you can link us to? You are not the first to ask these questions and we need to do better than pointing people to code.

What are some ways to work with MDS files outside of this streamer library?

StreamingDataset can be used directly without a DataLoader, lazily downloading shards on sample access. It is also fine to not "stream" at all, just pointing it at local files.

If you don't like the intricate shards/samples to nodes/ranks/workers logic and want just a local map Dataset, see https://github.com/mosaicml/streaming/blob/main/streaming/base/local.py

Do you have any specific data processing use cases in mind that we aren't covering? We would be happy to gather any feedback you can share.

knighton commented 1 year ago

Maybe we could add support to MDS file type directly in img2dataset to avoid having to do a (costly) conversion after the fact.

This sounds like a great idea! We’d love to work with you on this if you think this is useful.

rom1504 commented 1 year ago

I see, thanks for the thorough explanation!

we also need extremely low latency cold random access on the granularity of individual samples, which all have the same fields.

We store the byte offset where each sample is found at the head of the file as an int array. In the body of the file, samples are stored contiguously for fast random access with unbuffered i/o.

This sounds like you are caching shards in memory or in local ssd. That's a very strong requirements for large image datasets. For example laion5B in 384x384 is 300TB.

That would be a lot of ssd drives to get random access

For bigger datasets (other modalities or more samples), it means even more high cost storage (ram or ssd).

I'm curious if you have any thoughts on that. It seems to me that sequential access is way cheaper than random access for >100TB datasets. (Sequential access is provided well by HDD or high latency systems like gcs or S3)

knighton commented 1 year ago

Agreed, random access is not suitable for training due to the samples being stored remotely in shards. However, numpy-style access has come in handy to us for slicing and dicing datasets in parallel, for introspection, and gives us complete flexibility when designing shuffling algorithms that are shard-aware.

At a high level, StreamingDataset shuffles tend to exhaust one repeat of a shard at a time for download efficiency reasons, with shard repeats (i.e. you have multiple repeats when upsampling rate is >1) scattered across the epoch (splitting repeats of shards to avoid negatively impacting convergence when heavily upsampling data subsets on the fly).

We use a bucket of tricks (shuffle block size, high num_canonical_nodes, etc.) to achieve a very resilient "random" shuffle while keeping the number of shards that needs to be resident at a very practical level (where we are not blocked on downloading). To keep cache usage low, we are in the process of adding cold shard eviction, which has been somewhat nontrivial given our requirements...

jacobwjs commented 1 year ago

@rom1504 These are rather timely questions for me that you posed. Curious if you (or someone) is planning on integrating this into img2dataset as an option beyond webdataset?

seralf commented 4 months ago

Hi, very nice explanation of several aspects here! Thank you @knighton for the details, as some of them were not that clear to me, exploring documentation and reading some of the code.

I'm curious about these passages:

Why is that format better?

Performance! How does this script fare on your system? https://github.com/mosaicml/streaming/blob/main/scripts/serialization/compare.py

Do you have plans on doing some comparison on Avro? I had chances to work with that in the past, and I see some similarity with the description on how MDS was conceived (in particular: the attention to be "streamable", and the metadata embedded explicitly).

If you don't like MDS, you can use other formats instead like JSONL or CSV, and StreamingDataset will merrily take your JSONL or CSV shards. The catch is that it will have to pair each JSONL/CSV file with an additional file that we generate containing precise sample byte offset info exactly as in the header/"neck" portion as described above for it to work. We haven't made much use of this instead of MDS as of yet, but the option is there.

Can you suggest any pointers in code, to understand better how to study this?

nice work! :-)

Alfredo