rwth-i6 / returnn

The RWTH extensible training framework for universal recurrent neural networks
http://returnn.readthedocs.io/
Other
349 stars 130 forks source link

Dataset post processing #1505

Closed albertz closed 2 months ago

albertz commented 8 months ago

Examples of post-processing:

Some datasets already have partial support for post processing. Examples:

There was the idea about storing generic raw audio (or maybe even Ogg) inside the HDFDataset. And similarly, there was also the idea about storing the text (UTF8 bytes) inside the HDFDataset. In both cases, you would then maybe want to transform those into audio features or BPE labels on-the-fly as part of the dataset.

There are multiple options how to implement this:

JackTemaki commented 8 months ago

I would be very interested in this functionality, but I did not put too much thought in yet on how this would look best. I am definitely a fan to use MetaDataset for everything, so I would not bother if it would be part of that.

curufinwe commented 8 months ago

Commenting on your 3 suggestions in order:

  1. Not very scalable, having to add this to every dataset makes it unnecessarily complicated.
  2. The dataset classes contain more logic than just returning a given sequence. They also contain logic for sequence ordering etc.. This is not needed for a postprocessing pipeline. One could make a dataset for post-processing which takes an arbitrary function as input, but then you still have to nest your datasets correctly and the dataset dict will have a lot of nesting. That's not nice, but maybe of secondary concern.
  3. I like the idea of having a separate postprocessing function that is applied to the dataset output independent of the dataset most. It's orthogonal to the data-loading logic and does not make the dataset dict more complicated than it is right now. Only disadvantage I can see is that it adds a new global key to the config that users need to be aware of to use. They don't find out about this by just browsing through the list of available datasets or the documentation of a particular dataset.
albertz commented 8 months ago
  1. Not very scalable, having to add this to every dataset makes it unnecessarily complicated.

Well, you would factor this out, to have the logic in some common class PostProcessing, and it would be very easy to integrate this into an existing dataset. Similarly like Vocabulary or ExtractAudioFeatures is currently also easy to integrate. And as a starting point, maybe only HDFDataset, OggZipDataset and/or MetaDataset would have this.

But yes, I kind of agree.

  1. The dataset classes contain more logic than just returning a given sequence. They also contain logic for sequence ordering etc.

Well, we do use combinations/transformations of datasets already, e.g. see MetaDataset, CombinedDataset, ConcatSeqsDataset, etc. Only one of the datasets performs the sequence ordering then, and the other datasets will be given a predefined sequence order list. The API was specifically extended to allow that (init_seq_order(self, epoch=None, seq_list=None, seq_order=None)) and all datasets we use support this well.

So, such postprocessing logic would not really add anything new there - it fits very natural in how MetaDataset and co work currently.

  1. separate postprocessing function ... Only disadvantage I can see is that it adds a new global key

There is also another aspect which becomes ambiguous: The extern_data. Does this describe the data before or after the post processing? I.e. does this describe the data which comes out of the dataset, or the data which goes into the network/model? I think there are multiple places in our code which assumes this to be the same. E.g. we have some checks that the dataset output is compatible to extern_data. And then, we create such extern_data with the actual batched data to feed it to the model (in TF via feeding the extern_data placeholders, in PT just directly).

Note, we also have the model_outputs, which describes what comes out of the model. So maybe, if there is a custom post-process function, it requires that model_inputs is also defined, and extern_data describes the dataset output? I don't know... Or if the user does not specify model_inputs, it would assume the same format as extern_data (for many post-processing functions, e.g. speed perturbation etc, it would not change the format of the data).

curufinwe commented 8 months ago

Regarding 1: OK, I guess we agree not to do it this way. Regarding 2: My main worry is that readability will suffer if we have to many nested datasets, i.e. a PostProcessingDataset wrapping a MetaDataset wrapping multiple HDFDatasets. Regarding 3: extern_data should describe the data after preprocessing I think. It's the only type/shape that the model sees. Regarding the checks: I guess they need to be reworked, but there shouldn't be many places were that happens, right?

albertz commented 8 months ago

@JackTemaki argued, he (and many others) anyway use MetaDataset, so if this is part of MetaDataset, not much would change in terms of config complexity for them.

Regarding extern_data, yes, probably you are right, and mostly extern_data describes the input to the model, and there are not many places where it checks that the dataset output matches it.

So, how would that variant look like? Just a function in the config, like:

def dataset_post_process(data: TensorDict) -> TensorDict: ...

? But we have the train dataset, dev/eval datasets, and potentially also other datasets (forwarding, search, some custom user script, ...). Should the same post processing be used in all cases? Or should this be a property per dataset?

curufinwe commented 8 months ago

We definetly need to distinguish train and dev datasets. If we do data augmentation for training we don't necessarily want to do it for cross-validation. If we are doing some sort of format conversion then it would be needed for both. So in the end this should be a user choice. We can also have the type of dataset (train/dev/...) as one argument to the post-processing function.

albertz commented 8 months ago

How would the user specify such post-processing function per dataset?

It could be another argument for the dataset itself, so the user specifies it like:

train = {
   ...,
   "post_process": my_train_dataset_post_proc,
}
dev = {
   ...,
   "post_process": my_dev_dataset_post_proc,
}

It's a bit ugly, because Dataset itself then would not use this attribute at all, but only outside code would use it. But on the other side, if the user would specify it somehow outside of the dataset, as a separate config option, this could easily lead to errors, as you don't necessarily have a well defined name for the dataset. E.g. like:

dataset_post_process_funcs = {
    "train": my_train_dataset_post_proc,
    "dev": my_dev_dataset_post_proc,
}

This is maybe fine for the training task, but for search or forward, it's ambiguous, and also it doesn't really work if RETURNN is used for scripting and not as a standalone tool.

curufinwe commented 8 months ago

Could we add this to (every) engine class? The engine knows what kind of task it performs and what dataloader it uses for that task and could pick the correct post-processing function for that task.

albertz commented 8 months ago

The post-processing function is not per task but per dataset. At least that is what I wrote above.

Or do you want to have it per task? But I guess you don't really want it per task, but rather whether you train or eval? Or maybe a generic dataset_post_process_func, but there is always an additional extra argument train, which specifies the train flag (similar as dropout etc work, depending on this flag)? So like this:

def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...
curufinwe commented 8 months ago

Sorry, I was not precise enough. What I meant was that in the engine class you know for what the dataset is used and from which name in the config it comes from (I hope). Then one can select the correct postprocessing function to go with it (by picking it from the dict you showed above with the correct key). But that might indeed be too complicated and having the train flag could be already enough.

albertz commented 8 months ago

in the engine class you know for what the dataset is used and from which name in the config it comes from (I hope)

No, you don't. E.g. we have this API for forward:

def forward_with_callback(self, *, dataset: Dataset, callback: ForwardCallbackIface): ...

Or this API for the init (including training):

    def init_train_from_config(
        self,
        config: Optional[Config] = None,
        train_data: Optional[Dataset] = None,
        dev_data: Optional[Dataset] = None,
        eval_data: Optional[Dataset] = None,
    ): ...

It is handled by our __main__ to init the datasets here (train_data etc). (Although, it's a bit inconsistent. The engine anyway has a reference to the config, and in init_train_from_config, it also checks eval_datasets directly from the config, to potentially init further eval datasets.)

But yes, so I guess we can simply use this API:

def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...
curufinwe commented 8 months ago

OK, let's use def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...

albertz commented 8 months ago

One aspect I realized now: Where exactly would this be executed? As this is now outside the dataset, MultiProcDataset cannot really make use of this, so it cannot be parallelized over the workers of MultiProcDataset. That's a bit suboptimal, at least for my setup. But it can still run somewhere inside the PyTorch pipeline, i.e. then be parallelized over the PyTorch DataLoader worker instances, although most people don't parallelize there (num_workers 0 or 1).

vieting commented 7 months ago

I would also be interested in this feature. The discussed post processing solution seems fine to me. However, I would definitely like to have the post processing parallelizable into multiple procs. At least now, I have a setup with an OggZipDataset using pre_process which is wrapped by a MultiProcDataset. The MultiProcDataset gave a clear improvement in terms of computing time and therefore general speedup of training, so this would be very helpful. Maybe we could additionally allow to specify dataset_post_process_opts which could contain similar args as the MultiProcDataset takes and that automatically apply a similar multi processing logic.

albertz commented 6 months ago

Another aspect came up (@Judyxujj): We were interested in implementing mixup in this post processing function. But this is not really possible with the current design. This additionally needs:

(Note, I have a mixup implementation, but I did it inside the model, via a nn.Module (or rf.Module). The state is handled in the same way as e.g. nn.BatchNorm.)

NeoLegends commented 2 months ago

If we limited this feature to PyTorch we could also offer the user to inject custom DataPipes into the already existing pipeline instead of providing a callback-style API. That would interact favourably with multiprocessing, offer places to store state for e.g. Mixup, and just as well allows syncing data across workers, maybe given some setup or communication primitive to bootstrap further communication through.

@vieting Are you using torch or TF in your current setups?

albertz commented 2 months ago

That would interact favourably with multiprocessing,

No, not really. Only the DataLoader multiproc would apply here, which is usually just a single proc. But we want to have multiple procs here.

offer places to store state for Mixup,

I don't understand. Where? How? I don't think that a data pipe should really have state (except of temporary state which we would reset every subepoch).

Having state also means that you properly store/restore the state on disk after a restart, like the model parameters or optimizer state.

and just as well allows syncing data across workers,

No, how? Every worker on the dataset is independent from each other. They don't have any way to communicate with each other.

maybe given some setup or communication primitive.

I don't see any simple way to add this on this level.

In any case, we should probably not overthink/overengineer this. E.g., for things like mixup, I think it's ok if the state is reset at the beginning of an epoch, and also if it's just local to the current worker. Otherwise mixup can just be done on the model level, which we have already implemented. And most other things you would want to do in such post processing don't have state.

Also, I tend to think, it's ok to have multiple solutions here, and to see what is easiest for the user.

NeoLegends commented 2 months ago

No, not really. Only the DataLoader multiproc would apply here, which is usually just a single proc. But we want to have multiple procs here.

I was under the assumption that in RETURNN+PT the data loader num_workers is basically a replacement for MultiProcDataset. I.e. in the cases where I want to use more than one core for data loading I'd set num_workers > 1 to shard the load across cores. Is this wrong?

albertz commented 2 months ago

I was under the assumption that in RETURNN+PT the data loader num_workers is basically a replacement for MultiProcDataset. I.e. in the cases where I want to use more than one core for data loading I'd set num_workers > 1 to shard the load across cores. Is this wrong?

Yes, this is likely wrong. We never really tested this, but: There is no sharding implemented for DataLoader multiple workers. It cannot be: There is no way you can do sharding in general for any dataset (or only in an inefficient way by iterating through all data and discarding what you don't want). The only dataset which can do sharding properly is DistributeFilesDataset. But I'm pretty sure it would also not properly work with DataLoader multiple workers, as we never implemented anything special for that. I think (but also not sure) every worker should get a different random seed, and thus get different data, similar as in multi GPU training. Or maybe even not that, and it's just not properly handled.

But even with DistributeFilesDataset, I'm not sure if that works well together. DistributeFilesDataset assumes that it does the sharding, and that it is not itself already sharded. It definitely needs some extra logic for that. I'm also not really sure what happens when you reach the end in one worker. It immediately stops, or it finishes until all workers have no data anymore?

On the other hand, MultiProcDataset properly handles that. It first gets a list of all sequences of the current (sub)epoch, and then distributes that list among all the workers.

albertz commented 2 months ago

Btw, after some discussion yesterday with @curufinwe, I think a pragmatic simple solution for now is really to implement this as a new separate dataset, like this PostProcessingDataset. This directly solves the issues with parallelization of the post processing (it lives together with the dataset, so covered by those multiple workers) and it doesn't really require any extra logic in RETURNN, so it is the simplest solution with the least amount of complexity. The only downside is that this makes the config maybe harder to read, but I'm not sure if this is really such a big issue.

NeoLegends commented 2 months ago

With those points I agree this is the simplest way to move forward, thanks for the explanations!

albertz commented 2 months ago

For some other examples of similar processing datasets, see: VariableDataset, MetaDataset, AnythingDataset, ConcatSeqsDataset.

Btw, in the main post, I extended a bit the list of example post processing functions. One important type is also to support concatenating sequences (see #1573). I.e. it means the post processing transformation is not necessarily only on a single individual sequence, but could also do things like concatenating sequences, maybe shuffle sequences, drop sequences, insert new sequences, etc. However, this should be implemented in a streaming way, i.e. it gets in a sequence of TensorDict, and should output a new sequence of TensorDict.

albertz commented 2 months ago

However, this should be implemented in a streaming way, i.e. it gets in a sequence of TensorDict, and should output a new sequence of TensorDict.

The question is a bit, how to define the API for the user then.

Before, we suggested:

def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...

(This would now be an argument for our PostProcessingDataset.) (The train flag is then also not necessary anymore.)

Maybe we can still also provide this simpler API, as in many cases, the user wants to transform a single TensorDict only. So maybe the option post_process_seq: Callable[[TensorDict], TensorDict] or so.

But then, it should also support the operations over multiple TensorDicts. This could be such an option/API:

post_process_stream: Callable[[Iterator[TensorDict]], Iterator[TensorDict]]

The user can simply implement this as a generator, like so:

def no_op_post_process_stream(input_stream: Iterator[TensorDict]) -> Generator[TensorDict]:
    for data in input_stream:
        yield data 
albertz commented 2 months ago

The PostprocessingDataset from #1596 is merged now. It should allow to do all of the examples discussed from here.

Except that it should not have state, but that's not really so much a problem. You can still implement sth like mixup. You only should reset any internal state at the beginning of an epoch.