mosaicml / streaming

A Data Streaming Library for Efficient Neural Network Training
https://streaming.docs.mosaicml.com
Apache License 2.0
1.13k stars 142 forks source link

Augment existing dataset #646

Open LWprogramming opened 7 months ago

LWprogramming commented 7 months ago

🚀 Feature Request

Suppose we create a dataset

compression = 'zstd'
container_name = "foo"
folder = "bar
remote = f'azure://{container_name}/{folder}'
columns = {
    "id": "int",
    "value": "str",
}
with MDSWriter(out=remote, columns=columns, compression=compression, size_limit=1024*1024*64) as out:
    for i in range(100):
        # make each sample take 1 MB of space, value should be a string of 1M randomly generated alphanumeric characters
        value = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(1024*1024))
        sample = {
            "id": i,
            "value": value,
        }
        out.write(sample)
# expect 2 shards

Then later create a second MDSWriter to write data points 101-200 similarly, the second MDSWriter overwrites the existing shards. The preferable thing would be to continue writing new shards as though we had looped through 0-200 originally.

Motivation

Cleaned data comes in piecemeal and it would be nice to be able to just continue augmenting the existing cleaned dataset that's already been turned into a StreamingDataset format. Not sure if this would be particularly tricky or easy to do, or if it already exists and I'm missing a flag somewhere.

LWprogramming commented 7 months ago

One possibility that might work if a single data item never gets split across multiple shards is to search the existing folder/cloud storage directory for shard names, pulling the existing shard down to a temporary folder, and pick up where we left off using the index.json in that directory.

Alternatively (less efficient but maybe easier to work with) is to just start a new shard (e.g. if there are shards 0.mds.zstd through 17.mds.zstd, just create 18.mds.zstd when opening the second MDSWriter). These approaches seem plausible for Azure at least, I'm not super familiar with all the different types of uploaders in streaming/base/storage/upload.py, or with all the details of the process of actually converting the sample dict into something that goes into the mds file.

snarayan21 commented 7 months ago

@LWprogramming You can also start writing shard files to a different directory and use the merge_index function to combine the index files from multiple directories! But to your point, starting from where you left off for shard writing would be nice. We can look into it in upcoming planning.

LWprogramming commented 7 months ago

Cool! Out of curiosity, when might we use something like merge_index vs. just splitting the dataset into a bunch of pieces, uploading each piece to a separate folder, and then creating a stream for each folder e.g.

from streaming import StreamingDataset, Stream
locals = [
    "/foo/bar1",
    "/foo/bar2"
]
remotes = [
    f"azure://foo/bar1",
    f"azure://foo/bar2",
]
streams = [
    Stream(local=local, remote=remote) for local, remote in zip(locals, remotes)
]
ds = StreamingDataset(streams=streams, shuffle=False)

Is the main difference in what shuffling algorithms we can use? It looks like even with multiple streams, it's possible to do dataset shuffling.