lhotse-speech / lhotse

Tools for handling speech data in machine learning projects.
https://lhotse.readthedocs.io/en/latest/
Apache License 2.0
945 stars 216 forks source link

Draft specification of a new, extensible sequential storage format #846

Open pzelasko opened 2 years ago

pzelasko commented 2 years ago

Motivation

Currently Lhotse offers two types of data storage:

I'd like to have a format that is both efficient to read (comparable to WebDataset tarfile efficiency) and easy to modify the metadata (like editing a JSONL file is) or extend with new pieces of metadata or custom data fields (such as other types of features, quantized codebook indexes, etc.).

Implementation

Core idea

My idea is to keep the sequential storage, but keep the manifest and the data stored in separate files. That means we could have cuts.jsonl.gz for the manifest, and cuts.tar for audio/features/etc. At runtime, we read both of them in sync and populate the cuts with in-memory raw data (e.g. compressed lilcom binary blobs for features, encoded audio data, etc.) which is decoded into numpy arrays only when we call .load_X on the cut. Lhotse would need to offer an API that reads both of these files (or a directory with a standardized file layout) to make sure iteration over metadata and data is kept in sync.

The above expressed in pseudo-code:

# variant 1:
cuts = CutSet.from_new_format("cuts.jsonl.gz", data="cuts.tar")

# variant 2:
# Expected contents of my_data_dir:
# $ ls my_data_dir/
# cuts.jsonl.gz cuts.tar
cuts = CutSet.from_new_format("my_data_dir/")

# The underlying CutSet reads manifest lines and the corresponding data sequentially in sync from the two files
# The following conditions are the same as in WebDataset based approach:
# - the data discarded by .filter is read into memory anyway
# - the shuffle operation keeps a buffer in memory that contains the actual data in addition to the metadata
for cut in cuts.filter(lambda c: c.duration >= 5).shuffle().subset(first=1000):
    audio = cut.load_audio()  # the data was already in memory, but it's an implementation detail
    ...

Modularity extension

The idea above can be extended to support multiple sequentially read tarfiles with data, each for a single data field type, e.g.:

$ ls my_data_dir/
cuts.jsonl.gz audio.tar features.tar codebook_indexes.tar ...

This is very convenient because in order to extend the sequentially stored data + metadata with a new field, we only need to add a new tar file that contains the data, and produce a new JSONL manifest that refers to that field. I expect there will be some cost to pay in terms of I/O efficiency, but I can't really tell without testing if this is going to be a 10%, 50%, or 90% reading speed penalty.

The modular variant can also implement an API that will limit the field types being pulled into memory (or, effectively erase these fields from cuts) when the user needs only a specific set of data fields to optimize the I/O usage.

Other considerations

I am considering that maybe we don't actually need WebDataset dependency to handle all of that -- we only rely on a small subset of its features related to storage: reading/writing tarfiles, opening pipes to subprocesses, and de-duplication of data across dataloader subprocesses. I think all of those can be just ported to Lhotse for a greater control over the low-level details.


I'd love to get some thoughts on these ideas. @danpovey @csukuangfj @janvainer @Tomiinek

danpovey commented 2 years ago

For very large scale data I think it's reasonable to limit ourselves to sequential access (and only local shuffling) and it seems to me that WebDataset would handle the I/O, and the aggregation of same-named items from different files, quite well.

A typical scenario is that someone might want to train on 50% gigaspeech, 40% librispeech, 10% switchboard (assuming they have found a way to make the transcripts compatible), and I'm thinking that each of those datasets might have been written to disk as a bunch of xxx.tar.gz files, each one containing a small part of the dataset. And perhaps different .tar.gz files for different components, e.g. lilcom features vs. audio vs. codebooks. [I'd be opening to just putting the lines of the manifest into a .tar.gz, as if each one had been in one file, so that WebDataset could handle the aggregation of that at the same time as it handles, say, features vs. codebooks].

So what I'm thinking is we'd provide a way to do the mixing of data sources, and to define what an epoch is in this scenario (if we're to have such a notion), and also a way of saving the current state to disk, like which files we were in the middle of reading and how far through, so that we can resume when a training process needs to continue.
[... and also a way to gracefully handle situations when one of the files is unavailable or corrupted.]

For augmentation we might want to be reading Musan data in randomly too, I suppose in principle we could just cycle through that rather than doing it as random-access. We can just randomize the order of everything at the point when we are dumping it to the tar files, to make it unnecessary to do long-range randomization when we do data loading.

Rather than having multiple data-loaders read in the same data and discard some, it seems to me that that different dataloader processes could just handle different tar.gz files, if there are multiple such files, that would be more efficient. (I don't know whether that would easily mesh with PyTorch's concept of dataloaders though.) E.g. if we are to cycle through a dataset that's dumped in .tar.gz files called a,b,c, and there are 2 dataloaders, we could view it as alternating in a stream of files that repeats, like a b c a b c, so process 1 takes a c b a c b... and process 2 takes b a c b a c. Now, if the files are different sizes, and num-loaders and num-files are not coprime, one dataloader could "get ahead" of the other so that some data would end up being seen more often than others. But that could in principle be handled by simply ensuring during initial data processing that each archive within a dataset has about the same number of files and they have similar properties. For Librispeech we could mix in the train-clean-100 with the train-other-500 data, for example, by specifying that we sample the train-clean-100 data 5 times less frequently than train-other-500, so that we cycle through them at about the same rate. [This treats them as if they were different datasets, as they have different properties and I'm assuming we prepared them separately and did not co-mix them into tar files.] Within each dataset though, say train-clean-100, we would have to ensure that the files are "properly mixed".

Incidentally, the concept of an "epoch" is a little tricky when mixing multiple datasets. If epoch is defined by gigaspeech, for instance, on epoch 2 we start from the start of gigaspeech, but we may be starting from the middle of the other data sources; and the only way to know where to start is to load the state of the data-loader. But that's OK.

I don't know whether there might be some opportunity for simplification or introducing simpler abstractions. I think all this cut/recording/supervision stuff can potentially get confusing. And if we're to be dealing with streams of these items (e.g. streams of Cuts) then it's not clear to me that the concept of CutSet is so useful any more? And we could just think about things that operate on cuts rather than sets of cuts? Just a thought. I'm thinking that maybe all of this recording/cut/supervision stuff might be used more during initial processing of the dataset, and when we dump the supervised segments to disk we might be able to forget about the recording stuff? Or put any info we need into the Cut directly to avoid having to sync with a manifest of Recording stuff that may not have convenient sequential access and may not really even be needed? These are quite vague thoughts as I am not familiar with all details of the current implementation.

pzelasko commented 2 years ago

Thanks for a very detailed comment Dan!! I will distill this into specification items for myself later. I'll leave some comments below just to keep you informed on which features are already available with today's setup.

A typical scenario is that someone might want to train on 50% gigaspeech, 40% librispeech, 10% switchboard (assuming they have found a way to make the transcripts compatible), and I'm thinking that each of those datasets might have been written to disk as a bunch of xxx.tar.gz files, each one containing a small part of the dataset.

We support that already and will keep supporting it for sure. Borrowing the terminology from WebDataset these smaller files are called "shards". See the docs here:

https://github.com/lhotse-speech/lhotse/blob/c5d0c4011caffe6ac26d790c90fde69809215bf8/lhotse/dataset/webdataset.py#L193-L202

or here:

https://github.com/lhotse-speech/lhotse/blob/c5d0c4011caffe6ac26d790c90fde69809215bf8/lhotse/dataset/webdataset.py#L118-L128

For augmentation we might want to be reading Musan data in randomly too, I suppose in principle we could just cycle through that rather than doing it as random-access. We can just randomize the order of everything at the point when we are dumping it to the tar files, to make it unnecessary to do long-range randomization when we do data loading.

Good idea.

Rather than having multiple data-loaders read in the same data and discard some, it seems to me that that different dataloader processes could just handle different tar.gz files, if there are multiple such files, that would be more efficient. (I don't know whether that would easily mesh with PyTorch's concept of dataloaders though.) E.g. if we are to cycle through a dataset that's dumped in .tar.gz files called a,b,c, and there are 2 dataloaders, we could view it as alternating in a stream of files that repeats, like a b c a b c, so process 1 takes a c b a c b... and process 2 takes b a c b a c. Now, if the files are different sizes, and num-loaders and num-files are not coprime, one dataloader could "get ahead" of the other so that some data would end up being seen more often than others. But that could in principle be handled by simply ensuring during initial data processing that each archive within a dataset has about the same number of files and they have similar properties. For Librispeech we could mix in the train-clean-100 with the train-other-500 data, for example, by specifying that we sample the train-clean-100 data 5 times less frequently than train-other-500, so that we cycle through them at about the same rate. [This treats them as if they were different datasets, as they have different properties and I'm assuming we prepared them separately and did not co-mix them into tar files.] Within each dataset though, say train-clean-100, we would have to ensure that the files are "properly mixed".

I think something similar to what you're describing is being supported today. If you:

1) create a CutSet with a list of WebDataset shards:

cuts = CutSet.from_webdataset(
    ["shard1.tar", "shard2.tar", "shard3.tar", ...], 
    split_by_node=True, 
    shuffle_shards=True
)

2) construct the sampler normally, and

3) provide some extra machinery (IterableDatasetWrapper and a special worker_init_fn) to DataLoader:

train_dloader = DataLoader(
    IterableDatasetWrapper(
        dataset=train_dataset,
        sampler=train_sampler,
    ),
    batch_size=None,
    num_workers=4,
    # Note: Lhotse offers its own "worker_init_fn" that helps properly
    #       set the random seeds in all workers (also with multi-node training)
    #       and sets up data de-duplication for multi-node training with WebDataset.
    worker_init_fn=make_worker_init_fn(),
)

then the list of shards is automatically split between the nodes (2 GPUs => each sees a unique list of shards with size M = N / 2) and between the dataloader workers (4 workers => each sees a unique list of shards with size K = M / 4 = N / 8). You can also see an example of this in the following tutorial: https://colab.research.google.com/github/lhotse-speech/lhotse/blob/master/examples/02-webdataset-integration.ipynb#scrollTo=NZCG25tlDa1d

The issue with that option is that you will almost certainly end up with a different number of batches on each GPU, unless you create an infinite CutSet (cuts = cuts.repeat()) and track epochs manually (or not at all), or leverage PyTorch's Join context manager https://pytorch.org/tutorials/advanced/generic_join.html.

I think your suggestion differs in that you suggest every dataloader worker would see all of the shards, just in a different order. It's interesting, but it makes the notion of epoch problematic as you mention later.

Incidentally, the concept of an "epoch" is a little tricky when mixing multiple datasets. If epoch is defined by gigaspeech, for instance, on epoch 2 we start from the start of gigaspeech, but we may be starting from the middle of the other data sources; and the only way to know where to start is to load the state of the data-loader. But that's OK.

This is interesting, maybe we can add a mechanism to query about epoch status of each dataset separately. I'm not sure yet. Definitely worth it and doable to keep track of which shards were used and how many times, and the users can figure out the epoch info from that probably.

I don't know whether there might be some opportunity for simplification or introducing simpler abstractions. I think all this cut/recording/supervision stuff can potentially get confusing. And if we're to be dealing with streams of these items (e.g. streams of Cuts) then it's not clear to me that the concept of CutSet is so useful any more? And we could just think about things that operate on cuts rather than sets of cuts? Just a thought.

I also see the requirements for Lhotse shifting a little bit with time as we're tackling bigger data and streaming/lazy/iterable style of datasets. So far my approach has been to "modernize" the CutSet to behave more and more like a thin wrapper on an iterable that has some methods to be applied later on the items of that iterable. That also means the implementation of most methods on CutSet was/is being moved to Cut instead, and CutSet just aggregates the results and lets iterate over that. I think it's still useful at this point.

I'm thinking that maybe all of this recording/cut/supervision stuff might be used more during initial processing of the dataset, and when we dump the supervised segments to disk we might be able to forget about the recording stuff? Or put any info we need into the Cut directly to avoid having to sync with a manifest of Recording stuff that may not have convenient sequential access and may not really even be needed?

For now I found a way to keep the random-access and sequential approaches compatible by loading audio only from the part recording that corresponds to a cut and storing that in the tar file as a new recording. So if you had 30min recording and a 5 seconds cut, which is exported into WebDataset, it has its own Recording with "memory" type AudioSource that contains just the 5 seconds of audio. I find it convenient because you always have the same interface to your data and metadata regardless on how it's stored and loaded into memory.

pzelasko commented 2 years ago

Another question should we support older PyTorch versions for this format? WebDataset was largely ported to core PyTorch (https://github.com/pytorch/data), and some of these things like sharding filters or even tar file reading could be used from there. But it probably means this data loading format would require PyTorch 1.12 or later.

danpovey commented 2 years ago

Regarding PyTorch version, I think 1.12 is quite recent. I'm still using 1.7, for instance. Does it become difficult to support versions both earlier than, and >=, 1.12, if using WebDataset?

My suggestion regarding splitting up the shards was in the context of an infinitely repeating data loader, and assuming the num-shards isn't exactly dividable by the the number of data loader workers. So you consider the infinitely repeated lists of shards, a b c a b c ... and just partition it to the workers round-robin. An issue, though, is that after many epochs the workers could possibly become desynchronized so that they would be visiting the same data at the same time.

What would be more ideal is if there was a way to synchronize the allocation of shards, such that the rule is, the worker draws from its current shard and when it is exhausted it gets the next shard from a shared queue of shards. [But then we have the question of how to manage that shared queue].

If we prepared librispeech separately among the subsets, for instance (because we might want to train on the 100h-subset independently, sometimes), then we could define Libri-960 as a rule where we take approximately 50% of cuts from the 460h set of shards, 40% from the 300h set of shards, and 10% from the 100h set of shards, and we just cycle within each of those subsets. And we allow to do this recursively, e.g. define a rule that takes 20% Librispeech and 80% Gigaspeech.

It would be quite possible to just declare that an epoch is a certain duration of data, in seconds, so it doesn't necessarily correspond to exhausting the data. And all datasets are infinitely repeating, with the user specifying proportions to draw from various sources, e.g. libri vs. giga. Then, the training process would decide when to stop reading (after the specified num epochs) and when we're done we can just let the data loaders die if that's what they are going to do.

I'm thinking that, when constructing the machinery for reading data in, it might be best to do it in a very generic way: not as a CutSet necessarily but as some more generic kind of lazily loaded dict where there could be (dataset/data-source)-specific ways of loading certain keys. E.g. when someone tries to access item.features, for some datasets we read the features from disk; or for other datasets they might be computed. But that would be done using some fairly generic mechanism, where the user had specified that "features" is loaded by some function (when you call it the first time; and then it's cached in the item).

The difficulty with Lhotse right now is that it isn't underpinned by simple abstractions, which makes it quite hard for users to discover features. This is my fault, of course, for not coming up with simple enough ideas in the first place.

I'm thinking of a general workflow for the data loaders where they:

If we are going to make all data sources "infinite"/repeating, so that there is no concept of calling .size() on these things, I'm thinking all of these things could be iterable objects. So the "read in items" stage is some data source that when you do next(), it returns a LazyDict object; then the "validator" reads from that source, runs the validate() function, and also is an iterator where you can get the LazyDict object; the same with shuffle(); then the 'group' thing would, I suppose, just return lists of LazyDict objects, presumably as directed by a user-specified function; and so on. Perhaps at the end, after batching the items, the iterator would return an actual Python dict. I'm thinking that these iterators would, in general, take iterators in their constructors.

So: data_source1 = [some object that is going to return basic items one by one] data_source2 = [some object that is going to return basic items one by one] combiner = Combiner( [ (data_source1, 0.2, data_source2, 0.8) ] ) # draws according to specified proportions validator = Validator(combiner, validate_func) shuffler = Shuffler(validator, 1000) .. and so on.

.. I haven't thought of how we could get the "state" of this process, though [necessary for restarting training runs.], and about the practicalities of transmitting this information to the data-loader process.

Anyway these are just some thoughts. In the end Piotr will have to decide what the best approach is.

On Tue, Oct 11, 2022 at 5:16 AM Piotr Żelasko @.***> wrote:

Another question should we support older PyTorch versions for this format? WebDataset was largely ported to core PyTorch ( https://github.com/pytorch/data), and some of these things like sharding filters or even tar file reading could be used from there. But it probably means this data loading format would require PyTorch 1.12 or later.

— Reply to this email directly, view it on GitHub https://github.com/lhotse-speech/lhotse/issues/846#issuecomment-1273828699, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAZFLO2H35SCWTWIRZA6E4TWCSBUJANCNFSM6AAAAAARBN4VLQ . You are receiving this because you were mentioned.Message ID: @.***>

pzelasko commented 2 years ago

This is super helpful to organize my thinking for this effort, thanks!

The iterable-based mechanism you described is exactly what the PyTorch team is building with torchdata (see https://github.com/pytorch/data). I was able to validate already that we can leverage it for loading data from multiple tar archives + JSONL metadata and combine it together. The only issue is that the first release was for PyTorch 1.11 and the project is still not stable (for 1.12 they already broke some backwards compatibility), so adopting this mechanism means the new Lhotse dataloading mechanism would require a recent version PyTorch. I think they are also interested in the capability of resuming the training in that project (they already have some machinery to construct a graph representation of the data pipeline). It's not there yet today (AFAIK), but it means we'll end up trying to solve the same problems.

One more note on older PyTorch versions: based on the code I already have, I think it's possible to support this effort without torchdata and without WebDataset (I have my own simplified version of tar writers/readers). I'm just not sure if it makes sense to duplicate their other capabilities yet (maybe it does).

Regarding abstractions, I tend to agree they got a bit too complex and can be a bit confusing sometimes. Lhotse has grown quite a lot during the last 2 years and tries to support everything in every possible combination. The only way to simplify is to break backwards compatibility, and so far my goal was the reverse: to be 99.9% backward-compatible with the initial design choices. Maybe it's worth a separate discussion how Lhotse 2.0 should look like :) I agree that the core focus should be on iteration-based approaches.

For combining shards the way you described, I'm pretty sure it's possible to do today, but maybe we can somehow make the API more discoverable. Code (also see a tutorial I wrote about combining datasets: https://colab.research.google.com/github/lhotse-speech/lhotse/blob/master/examples/03-combining-datasets.ipynb):

cuts_libri100h = CutSet.from_webdataset([libri_100h_shard0, libri_100h_shard1, ...])
cuts_libri360h = ...
cuts_libri500h = ...
cuts_giga = ...

cuts_combined = CutSet.mux(
  [
    CutSet.mux(
      [cuts_libri_100h, cuts_libri_360h, cuts_libri_500h],
      weights=[0.1, 0.4, 0.5],
    ),
    cuts_giga,
  ],
  weights=[0.2, 0.8],
).shuffle(buffer_size=50000)  # you want the buffer to be as large as possible within CPU RAM limits

Regarding the features that are either precomputed or computed on the fly when somebody calls item.features: I honestly think this needs to be implemented inside the data pipeline/iterator rather than inside of getting an attribute, otherwise the feature computation will take unnecessary time inside of the training loop. But it's something that can definitely be improved as it's not easily achievable with Lhotse today.

The shard queue idea is... interesting. Let me think about it. One immediate issue is in distributed training where it's almost impossible to synchronize without an external service (I don't think we want to go that way though).

danpovey commented 2 years ago

Regarding someone calling item.features: i wasn't imagining that this would happen in the training loop (I thought this would happen in the dataloader, and we convert to plain dict before sending to training process). I was thinking that this would happen in the dataloader, but called by some user-specified function, for, say, processing items prior to combining into batches.
Yes, IDK about how one might organize the shard-queue thing. One possibility is to have a background thread in the training process open a port for that? Gets ugly though, obviously.