lhotse-speech / lhotse

Tools for handling speech data in machine learning projects.
https://lhotse.readthedocs.io/en/latest/
Apache License 2.0
904 stars 204 forks source link

Duplicate Manifest ID / Mux #1271

Open m-wiesner opened 5 months ago

m-wiesner commented 5 months ago

I am probably using some features wrong, but I had a cutset that had data in a bunch of different languages. I wanted to sample each language according to some weight. I did the following.

datasets = groupby(
    sorted(train_cuts.to_eager(), key=lambda x: x.supervisions[0].language),
    lambda x: x.supervisions[0].language
)

manifests = [CutSet.from_cuts(ds[1]).to_eager().shuffle() for ds in datasets]

total = sum(len(m) for m in manifests)

train_cuts = CutSet.infinite_mux(
    *manifests,
    weights=[(len(m) / total)**0.3 for m in manifests],
)

The only reason I use to_eager() on the cut sets was so that I could get the length of each sub cut set for the different languages which I used to compute the weights. If I run the exact same code on train_cuts without these extra lines everything works fine. The only other modification I made was that I removed the loop for epoch since my understanding is that there is no real notion of epoch with infinite_mux(). I am using the resulting train_cuts with the DynamicBucketingSampler, and I'm not totally clear on whether that is a good thing to do. My guess as to what caused the error was that I ended up sampling the same cut twice and it at some point ends up in the same minibatch and causes this assertion error regarding duplicate manifest ids. Any help is much appreciated!

The error I get is below. It's a duplicate manifest id. It happens after multiple successful minibatch updates as far as I can tell.

train_one_epoch( File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/icefall/egs/radio/language_id/train_mean_prop.py", line 532, in train_one_epoch for batch_idx, batch in enumerate(train_dl): File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/anaconda/envs/python3.8_torch2.0/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 634, in next data = self._next_data() File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/anaconda/envs/python3.8_torch2.0/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1346, in _next_data return self._process_data(data) File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/anaconda/envs/python3.8_torch2.0/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1370, in _process_data self._try_put_index() File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/anaconda/envs/python3.8_torch2.0/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 1352, in _try_put_index index = self._next_index() File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/anaconda/envs/python3.8_torch2.0/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 624, in _next_index return next(self._sampler_iter) # may raise StopIteration File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/lhotse/lhotse/dataset/sampling/base.py", line 281, in next batch = self._next_batch() File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/lhotse/lhotse/dataset/sampling/dynamic_bucketing.py", line 261, in _next_batch batch = next(self.cuts_iter) File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/lhotse/lhotse/dataset/sampling/dynamic_bucketing.py", line 408, in iter batch = next(iter(batcher)) File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/lhotse/lhotse/dataset/sampling/dynamic.py", line 269, in iter yield self._collect_batch() File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/lhotse/lhotse/dataset/sampling/dynamic.py", line 345, in _collect_batch return detuplify(cuts) File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/lhotse/lhotse/dataset/sampling/dynamic.py", line 281, in detuplify cuts = CutSet.from_cuts(cs[0] for cs in cuts) File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/lhotse/lhotse/cut/set.py", line 310, in from_cuts return CutSet(cuts=index_by_id_and_check(cuts)) File "/expscratch/mwiesner/scale23/scale2023/icefall/tools/lhotse/lhotse/utils.py", line 710, in index_by_id_and_check assert m.id not in id2man, f"Duplicated manifest ID: {m.id}" AssertionError: Duplicated manifest ID: 12709492511372685294-194417

pzelasko commented 5 months ago

You can quickly fix that with the same effect by replacing infinite_mux(*cuts, ...) with mux(*[c.repeat() for c in cuts], ...).

The issue comes from the fact that infinite_mux samples sources with replacement, so it's completely possible that if you have 20 cutsets, infinite_mux would sample the same cutset for more than one slot. It is really intended for scenarios where you have a very large number of inputs (in the thousands, either a lot of datasets, or sharded datasets where each cutset comes from a single shard) and opening such a large number of files is not possible on your OS (because there may be N GPUs x M dataloader workers, etc.). If you don't hit the limit of open file descriptor, you should always use mux.

As a separate note, maybe it makes sense to drop the requirement of no duplicated IDs in a CutSet... more and more often I'm embracing cases where it's more annoying than useful.