iterative / dvc

🦉 Data Versioning and ML Experiments
https://dvc.org
Apache License 2.0
13.82k stars 1.18k forks source link

Incremental processing or streaming in micro-batches #331

Closed kskyten closed 3 years ago

kskyten commented 6 years ago

It seems like it is only possible to replace a dataset entirely and then re-run the analysis. Incremental processing would enable more efficient processing by avoiding recomputation. Here's how Pachyderm does it.

dmpetrov commented 6 years ago

Hi @kskyten Could you please give us a bit more details? Are you talking about deep learning style computations when a result of each epoch will be saved and then, possible, reused? If so, how do you save the results - as a single file or a set of files in an input directory (like in Pachyderm docs)?

kskyten commented 6 years ago

Let's say you have a folder of images and you define a computationally intensive pipeline for them. After having defined this pipeline you acquire new images that you want to process. This corresponds to the inter-datum incrementality in Pachyderm, if I understood correctly.

dmpetrov commented 6 years ago

We are going to implement Dependency to directory #154 which is a little bit related to this incremental feature. But it is not enough.

This is a good scenario. Let's keep this feature request - it will be implemented. We just need to decide a priority.

ghost commented 5 years ago

@dmpetrov , are we there yet or we should keep this open?

efiop commented 5 years ago

@mroutis Yep, this should work now by using dvc unprotect: https://dvc.org/doc/user-guide/how-to/update-tracked-files

Closing, please feel free to reopen.

dmpetrov commented 5 years ago

Guys, this is actually a request for streaming\micro-batch processing.

Scenario: I have a dir with images and I'd like to process each image with dwt_ll4.py to generate a spectrum matrixes. If new 23 images were added I'd like to run the script only for these 23 images.

It is mostly data engineering scenario, not data science. So, we should think carefully if DVC should handle this or not. But it is still a valid case and it will be especially useful when we implement multi-processing #755.

So, I'm renaming and reopening.

ghost commented 5 years ago

Thanks, @dmpetrov

Pierre-Bartet commented 4 years ago

Hi, has this eventually been implemented ?

pared commented 4 years ago

Hi @Pierre-Bartet, it have not been yet implemented. Would you find this feature useful? Could you describe your use case?

Pierre-Bartet commented 4 years ago

@pared Use cases equivalent to the one already described:

Scenario: I have a dir with images and I'd like to process each image with dwt_ll4.py to generate a spectrum matrixes. If new 23 images were added I'd like to run the script only for these 23 images.

In fact this is exactly at this point that data version control tools could become really useful because the situation is messier than in the 'vanilla' use case. AFAIK Pachyderm is the only tool with this kind of feature but it is super heavy. I think what is needed is to be able to

So that a dvc repro would only run on needed files.

The hard part is that:

$ dvc run --d my_collection computation.py

could both mean:

So I guess the '--d' should be replaced by something else to distinguish between those two cases.

pared commented 4 years ago

could both mean

  • 'apply computation.py separately on each object of the collection' -> If for example, we want to subtract the mean of each image from each pixel
  • 'apply computation.py on the whole collection' -> If for example, we want to subtract the global mean of all the images from each pixel

I don't think that distinguishing whether to apply a given code on each image or all images should be DVC responsibility. DVC is an abstraction that takes care of tracking code and its dependencies. But there are too many use cases of DVC (any data science project, so data can range from CSV to folder full of files). This use case(apply on image vs dataset) assumes that DVC is aware of code logic and dependency structure, while it should not be.

Having said that I think that we could implement this feature (applying on "new" data) for directory dependencies, but it would need to be users' conscious decision to use it. One option could be, for example, flag-like: dvc run -d modified_folder -o spectrum_matrices_folder --incremental code.py Implementation won't be that easy though, because we should probably create some temporary dir that would store "only new" dependency files, and apply code to this temp dir and append output to already existing output of this stage.

Some notes:

After writing this, it seems to me that my proposition for this solution is wrong since it can easily be misused. Leaving this here for further discussion. Maybe we could implement this as advanced use case but that seems dangerously similar to --outs-preserve. Or maybe someone has a better idea of how it should be done.

shcheklein commented 4 years ago

@Pierre-Bartet @pared an interesting discussion. I also don't see a way how DVC being completely agnostic to the way code and files are organized now can solve this now. Thinking about the right abstraction for the job, I have first the question back:

Let's imaging we do:

$ dvc run --d my_collection computation.py

How does this script organized internally? How does it read the data from this my_collection?

I'm asking because right now like I mentioned we don't dictate any of that. But when we start doing things incrementally how do pass that delta information to the script? How will it know that it needs to read only certain files from the directory?

@Pierre-Bartet do you have some ideas that comes to your mind?

Pierre-Bartet commented 4 years ago

I've been thinking about it but I see no solution that would not totally change DVC's behaviour, mostly because of what you just said:

when we start doing things incrementally how do pass that delta information to the script?

The behaviour I had in mind was to have the dependencies given as arguments to the scripts (here computation.py) so that dvc run could either:

But that is indeed a huge deviation from the current dvc behaviour

markrowan commented 4 years ago

This is also a feature which I would like to see handled better with DVC if possible. Currently, the only way to avoid expensive re-computation on partial data sets when adding new data is to bypass DVC altogether, execute the processing on only the new files, and then run dvc commit on the results -- which seems rather to detract from the whole point of using DVC in the first place to track changes to datasets. And yet, it must be possible to avoid re-processing tens of thousands of files just because a single new file was added to an input directory.

@Pierre-Bartet, my thoughts for a possible behaviour were also in the same direction as yours, passing [delta] dependencies to the script being run.

A gentler approach along the same lines may be the following:

This is for sure "advanced" behaviour and requires users to be aware of the existence of the index. But at least it doesn't mean DVC having to change its behaviour in regard to how it calls commands with dvc run.

I realise this suggestion isn't necessarily entirely thought through, for which I apologise. Please feel free to tear it down where necessary :)

ghost commented 4 years ago

I find this feature really useful :sweat_smile: Not ML related but with ETL is pretty common to do such things. Having a cache is not useful if you are still recomputing files.

The simplest scenario that I can imagine is by giving the user a way to tell which files are new, for example:

mkdir data
for i in {1..3}; do echo $i > data/$i; done
dvc add data

When you add more files:

for i in {4..10}; do echo $i > data/i; done

Doing dvc ls --outs --new data.dvc would output:

4
5
6
7
8
9
10

that way the user would iterate over all of those files and run the respective command and then dvc commit data.

The implementation for ls --outs --new would be something like the following:

from dvc.repo import Repo
from dvc.stage import Stage

repo = Repo('.')
stage = Stage.load(repo, 'data.dvc')
out = stage.outs[0]
path = out.path_info

cached = set(path / info['relpath'] for info in out.dir_cache)
current = set(out.remote.walk_files(path))
new = cached - current

for path in new:
    print(new)

We could start with a simple implementation and add more features depending on users request, what do you think?


A lot of manual work, but if you want to process everything with a single call, like @Pierre-Bartet suggested in https://github.com/iterative/dvc/issues/331#issuecomment-555921791, you can link those files to a new directory and use it as an argument for your script:

new_stuff=$(mktemp -d)

for file in $(dvc ls --new data.dvc); do
    ln -s $(realpath $file) $new_stuff
done

python script.py $new_stuff && rm -rf $new_stuff
markrowan commented 4 years ago

This is especially frustrating now that I tried to implement my own incremental-processing workaround: I simply set an argument to my computation.py which, if true, doesn't touch any input file that already has a corresponding output file in the output.

I realise that this breaks the ability for DVC to know that all outputs were created by the command being run (but I already can't use DVC properly in my case anyway, since the only solution I have to avoid reprocessing all files every time a single file changes in the input is to manually run computation.py outside of DVC and dvc commit the results, which has the same breaking effect).

So I executed my pseudo-incremental version of computation.py on my input, only to discover that DVC automatically removes anything in the output location every time dvc run is called (without warning! -- see https://github.com/iterative/dvc/issues/2027 ) 😭

efiop commented 4 years ago

Hi @markrowan !

DVC deletes those outputs automatically only if it knows that it already has them in the cache, so you would be able to dvc checkout to get them back. We have an unofficial hidden flag for dvc run, that tells dvc to not remove the particular output before running the command. It is called --outs-persist ain CLI and persist: true in dvc-file itself. We don't document it because we are not quite sure if we are okay with how hacky it is and are afraid of people misusing it and then getting unreproducible pipelines 🙂 Please be aware of that, but feel free to give it a try and if you do, please make sure to let us know what you think about it. 🙂

dmpetrov commented 4 years ago

This is a great use-case. I even heard about this requirements for a couple of times outside of our issue tracker like "can DVC processes only new images?". It needs to be implemented as a part of DVC.

Challenges

There are some challenges with API and how to align the feature with the current DVC interface and which part goes to code:

Solutions

All of the above questions are great. Also, @Pierre-Bartet, @pared and @markrowan pointed to possible solutions. Some of the solutions might align well with DVC design.

Passing files as an argument (append to the args list) to the code is the best protocol between DVC and code that I see for now (I mean the first option with file_i):

The behaviour I had in mind was to have the dependencies given as arguments to the scripts (here computation.py) so that dvc run could either:

  • call repeteadly computation.py file_i
  • call a single time computation.py folder

How this can look from command line:

$ ls my_collection/
file_pig.jpg   file_cat.jpg   file_dog.jpg
$ dvc run --process-dependency my_collection/ --process-output processed/ python computation.py
# it runs the command for each of the file/dir with new/changed filename for each of the runs
#   python computation.py file_pig.jpg
#   python computation.py file_cat.jpg
#   python computation.py file_dog.jpg

What is important:

  1. DVC needs to understand that it works in "streaming" mode and it needs to be signaled to the tool by a separate command (not dvc run) or by a special set of parameters like in the example above --process-dependency/--process-output.
  2. DVC should create output dir for the first run when it was not yet created
  3. DVC should not remove the outputs dir before running/reproducing the commands
  4. Regular dependencies -d myscript.py -d libs/ might also be used
  5. Regular outputs cannot be specified -o logs/ (why not? any useful use case?). I feel like here is the place with another "deviation" from DVC.
  6. To make this command reproducible we need to make sure that DVC can get a checksum for each version of input dir. (how to get checksums for dirs? if not data dir? should we limit the command by data dirs?)
  7. Inside the input dir we should be able to work with files, dirs or both.
  8. Store all inputs and all outputs checksums (index from @markrowan comment). Probably the same way we do it now. Based on my understanding, we don't need to store mapping 'input_file_i --> processed_file_i' we just need a "checksum" of the entire input dir and each of the outputs.

Open design questions

Presenting result to users

When the use case will be implemented we need to show a nice looking use case in the documentation or a separate blog post. It's great to have this use case even before the release - it will help driving requirements and test the implementation.

@Pierre-Bartet @markrowan @kskyten could you please help us to define the use-case? The major questions:

  1. Dataset. Which dataset we should use? It should be open, and look natural, not like file_cat.jpg file_dog.jpg.
  2. Motivation. What kind of processing do we need to apply? If images, we should explain the motivation of not doing this processing on-flight or why the processing result should be stored. Heavy image processing - any examples of this?

If someone can participate or even drive this doc/blog-post activity it would be awesome.

Related issues

For a broader perspective, there are some issues that might be (or might not) related to this one. It would be great is we find some similarities in the scenarios and can reuse this.

  1. 1018 "build matrix". Difference: the build matrix has a predefined set of runs (and run parameters) while this issue takes all the files as the set of parameters. In theory, build matrix might be extended to this use case.

  2. 2825 pulling subset of files from a data-dir. It is kind of "interactive mode" of working with data folder. @Pierre-Bartet used this interaction-like trick in command $ dvc add new_file(s) --to my_collection

  3. Also, there was a verbal request from a user who asked for the ability to add a file into data-dir without pulling data (for optimization purposes). It is an inversion/push-version of #2825.
  4. ADDED 12/2/19: #755 - parallel job running. It seems like this streamming/processing case is a special case of parallel running.
Pierre-Bartet commented 4 years ago
  1. Regular outputs cannot be specified -o logs/ (why not? any useful use case?). I feel like here is the place with another "deviation" from DVC.

Yes both the input and the output will be some kind of new 'collection' objects.

  1. To make this command reproducible we need to make sure that DVC can get a checksum for each version of input dir. (how to get checksums for dirs? if not data dir? should we limit the command by data dirs?)

This can be done using hash lists or hash trees.

  1. Inside the input dir we should be able to work with files, dirs or both.

Therefore hash trees. Also trees can help if you want to be able to support folder with a large number of files (I've faced several use cases involving millions of files).

Related issues

Isn't all this also related to parallel processing ? (not a rhetoric question)

dmpetrov commented 4 years ago
  1. Inside the input dir we should be able to work with files, dirs or both.

Therefore hash trees. Also trees can help if you want to be able to support folder with a large number of files (I've faced several use cases involving millions of files).

Right. We are already using some kind of hashing in DVC. It would be great if the current hashing mechanism would be reused. The existing one might not work for not-data files and dirs.

Related issues

Isn't all this also related to parallel processing ? (not a rhetoric question)

Absolutely! Updated. It will require an additional command line parameter like --jobs 24 which is another argument for separating this into a separate command to not over pollute dvc run params list.

Suor commented 4 years ago

There is still a sub use case of training on top. For this you both need an old output (a pretrained model) and a set of added files (new inputs), which is essential to some deep learning, esp. images/nlp scenarios.

This, however, complicates things a lot:

Pierre-Bartet commented 4 years ago
  • an output is reused as a dep,

Isn't it already the case ?

  • there is no checksum reproducibility, because the final model depends on how we chunked the data.

Why ? In the situation you describe, the model is build from all the images, so it is not incremental: the training script must be applied to the whole image folder, not image by image.

Suor commented 4 years ago

Isn't it already the case ?

Not if you in fact process them 1 by 1, so that f(collection(items)) = collection(f(items)). Or at least you can work that around.

Why ? In the situation you describe, the model is build from all the images, so it is not incremental: the training script must be applied to the whole image folder, not image by image.

You train a model on a set of images, then you add a new batch and you train on a new batch starting from the pretrained model, not from random weights. This will result in a model knowing about images of both batches, but this is not the same as training on a combined set form the start, so we loose checksum reproducibility. This might be, however, close enough and faster to compute.

Suor commented 4 years ago

We can think of the difference as "do our transform function have this property":

f(chunk1 + chunk2, start=base) = f(chunk2, start=f(chunk1, start=base)) 

And also is it a strict = or a good enough ~ for a user.

Suor commented 4 years ago

We should also consider raising the priority. What do you think guys?

Pierre-Bartet commented 4 years ago

You train a model on a set of images, then you add a new batch and you train on a new batch starting from the pretrained model, not from random weights.

Ok my bad I misunderstood what you meant, so the flow will be like that:

image

As long as the dependency graph is a DAG there should be no checksum problem. Here maybe what is confusing is that both 'model 1' and 'model 2' could share the same key, for example 'model.pkl', but which one would you choose ? If you choose 'Model 1 + 2', then 'Model 1' will be inaccessible. It's a problem of hiding an intermediate result, there is no checksum problem here.

Suor commented 4 years ago

By nature of dvc both models will share the same filename, they will go in different git revisions though. Batches also combine into a single dep, in current design out = f(dep), that's it, history of a dep does not affect out, with incremental training it might affect.

Pierre-Bartet commented 4 years ago

Ok now I understand, there is 'currently' no way to distinguish between: image

and

image

But I am not sure to understand what 'currently' means here:

Batches also combine into a single dep

What batches, are they already implemented ? Are we talking about the current DVC status or the possible implementation of incremental situations ?

IMHO the use case you describe already works with DVC: you prepare 'model 1' with the data you have, then when you have more data you build a new 'model 1 + 2' from 'model 1' plus the new data. But as you said, if there are lots of batches, maybe some sugar can be added to hide 'model 1' and expose 'Model 1 + 2'.

Suor commented 4 years ago

What batches, are they already implemented ? Are we talking about the current DVC status or the possible implementation of incremental situations ?

You can add a dir with files into dvc, and then add some files to that dir and dvc add it again. It will have new checksum but same path, so you can rerun a pipeline with dvc repro on the new data, this will involve processing the whole thing of cause.

IMHO the use case you describe already works with DVC: you prepare 'model 1' with the data you have, then when you have more data you build a new 'model 1 + 2' from 'model 1' plus the new data. But as you said, if there are lots of batches, maybe some sugar can be added to hide 'model 1' and expose 'Model 1 + 2'.

You mean add a new stage to produce a new model? This is a dirty non-scalable workaround) You will need to duplicate everything for that:

dataA -> prepared_dataA -> modelA -> metricsA (this might be longer)
                           \
dataB -> prepared_dataB -> modelB -> metricsB
                             \
dataC -> prepared_dataC -> modelC -> metricsC

...

.. you die somewhere here ...

So you dup all outs and deps and stages. We can say that in dvc logic the data dep, might not be simply a dir of files, but a collection of batches (or a dir with some history), i.e. an incremental data artifact, then an input of a stage is also not a dir, but a collection of batches, then output will be reproducible, only to reproduce that you need to do:

f(chunkN, start=f(chunkN-1, start=...))

We will need probably an erase history command too. We still have an out as a dep situation though.

Pierre-Bartet commented 4 years ago

I've never encountered the situation you describe: either the number of batch was small, and indeed I wanted to train Model 1 + 2 using Model 1 + Batch 2, with a tractable number of batch and a need for being able to access Model 1 and model 2 separately, or the number of batch was big enough so that I wanted to retrain on all the batches (not incremental). But indeed your use case exists and it would be a nice feature.

I don't know about the exact DVC internal mechanism, but it seems to me that you (the front user) could write a pipeline stage like this:

f(chunkN, start=f(chunkN-1, start=...))

But under the hood it would be in reality:

dataA -> prepared_dataA -> modelA -> metricsA (this might be longer)
                           \
dataB -> prepared_dataB -> modelB -> metricsB
                             \
dataC -> prepared_dataC -> modelC -> metricsC

...

.. you don't die somewhere here because it is just automatically done ...
shcheklein commented 4 years ago

@Suor @Pierre-Bartet either I'm missing something or the case of running the stage on all images + pre-trained model can be solved by https://github.com/iterative/dvc/issues/331#issuecomment-559254859 (it's already in DVC but we don't merge docs for since don't like introducing "virtual" cycles into DAGs for now - so consider this semi-official).

The case from the ticket description is and a bunch of comments below is about actual Pachyderm-like incremental processing - when you run your script only on new files.

Just to clarify, which one those have you been discussing?

Suor commented 4 years ago

@shcheklein persist outs might help, we still need two things:

Pierre-Bartet commented 4 years ago

@shcheklein : Indeed what we've been discussing with @Suor is a different kind of 'incremental'

charlesbaynham commented 4 years ago

This issue describes our use case as well. We have raw data which is produced continuously. I intended to bin this into e.g. daily chunks, then run multiple stages of processing on each chunk, possibly using the inputs of other stages from the same time bin. My problem is that I don't want to reprocess years worth of data every day when a new file is added to the raw data, but I also don't want to manually create thousands of processing stages for every raw data file and all their children.

karajan1001 commented 4 years ago

I think this depends on the what the DVC think itself to be?

For me it's a data tracking tool in training staging, and data tracking and metrics comparing help me a lot when I make some experiments to the model, after that I use other tools to deploy my model.

So I use DVC as a data tracking tool like git, not a computation engine .I don't know what kind of tool, you guys want it to be

shcheklein commented 4 years ago

One more case and request - https://discordapp.com/channels/485586884165107732/563406153334128681/718108510671601824

Hi! My current main issue with DVC is the fact that the pipelines are structured with a stage = few inputs -> a few outputs. That works great on some cases, but it fail when I need to apply the same operation on many files (tens of thousands in my case). Eg running a ML model for detecting the keypoints of a person. Basically there is no map operation. Do you know if that is on the roadmap, or if there is a github issue about that?

Suor commented 4 years ago

So we might have map-style incremental processing and reduce-style one. The map seems like a lot simpler should we separate it?

shcheklein commented 4 years ago

@Suor it seems to me that both map or reduce would require some general mechanism first - how to apply certain logic only to new files. The way that logic being applied and actual output is being produced seems to be a simpler question to solve (and we even have some hacks like persistent outputs). Unless I'm missing something. Could you clarify, why do you think they are inherently different?

MatthieuBizien commented 4 years ago

I am the author of the comment above on Discord:

One more case and request - https://discordapp.com/channels/485586884165107732/563406153334128681/718108510671601824

I think it's also interesting to look at that use-case in relation with:

A potential different workaround would be to allow pipelines to create pipelines, so a "master" pipeline could create N different pipelines (one per file to process), and those would be executed one after the other. This would also allow very flexible workflows with dynamic pipeline reconfiguration based on scripts. "ML experiments and hyperparameters tuning" #2799 could be related. I can't find any issue related to that one, is it hidden or should I create it?

efiop commented 4 years ago

@MatthieuBizien Please feel free to create an issue for that :)

Pierre-Bartet commented 4 years ago

I think this depends on the what the DVC think itself to be?

For me it's a data tracking tool in training staging, and data tracking and metrics comparing help me a lot when I make some experiments to the model, after that I use other tools to deploy my model.

So I use DVC as a data tracking tool like git, not a computation engine .I don't know what kind of tool, you guys want it to be

I just want to use it for what its name stands for: "Data Version Control". The difficulty is that since we are talking about data, it can be so big that we don't want to actually commit it or have all of it on our computers (otherwise we would just use git and commit / push / pull / clone everything). Going from code (git) to data (dvc) means you want to be able to deal with only a subset of the full repository:

  1. You want to only partially commit the data (just its hash)
  2. You want to be able to have only a subset of the data on your computer (just its hash and a chosen subset of the full files, or even none of the full files)
  3. You want to be able to process only a subset of the data

I don't want it to be a computation engine but that is a direct consequence of this. If you have to rerun everything on the whole data every time you make the slightest change, IMHO that breaks the purpose of a data version control system i.e. point 3. is the missing feature.

jonilaserson commented 4 years ago

I hope this suggestion adds something to the above thread. Say I have 1M files in the directory ./data/pre, and a python script process_dir.py which goes over each file in ./data/pre and processes it and creates a file in the same name in a directory ./data/post. e.g. ./data/pre/123.txt --> ./data/post/123.txt

As we all know, if I define a pipeline:

dvc run -n process -d process_dir.py -d data/pre -o data/post python process_dir.py

then dvc repro will erase all content of data/post and re-run the script from scratch if I add or remove even one file from data/pre, which unfortunately means that those 1M files will unnecessarily be processed again - even though we keep cached copies of the outcomes.

suggestion: if we were able to define a rule that connects directly in the DAG pairs of data/pre/X.txt --> data/post/X.txt in the context of the process stage, then when can adjust the process stage in the pipeline as follows:

  1. identify which file-pairs haven't changed and remove those files to a temp dir
  2. run the process stage as you normally would.
  3. move the file-pairs from the temp dir back to their original locations.

It's like we are enhancing the DAG to include more local dependencies (and independencies), between input files and output files.

Suor commented 4 years ago

@jonilaserson the discussion here circles around that in a way. Your use case is however is a subset of whatever we discuss. It also has a workaround employing an undocumented --outs-persist feature, see above.

charlesbaynham commented 4 years ago

I'm using DVC for a project which processes continually produces time-series data in a series of steps. I described my use case above a little, but maybe I'll mention how I'm using it now.

My current system wraps DVC in code which produces DVC jobs based on the input data: it scans the input directories, creates a DVC file describing a job for each file that it finds unless a DVC file is already present and then runs DVC repro to compute any steps which aren't yet present.

The benefits of this structure are that, once the wrapper step has run, the repository is a fully valid DVC repository which doesn't need any special tool beyond DVC itself to interact with. In order to make this work, I needed to bin my data into preset chunks and have a way of mapping from inputs to outputs (since those outputs are then used in subsequent steps). In my time-series application, I did this by mandating filenames like "20200720_stepA_1.csv".

To bring this functionality into DVC itself, I'd imagine a couple of changes.

Meta jobs

I found the generation of DVC jobs to work quite well. You could imagine a "meta-job" which defines how input data should be processed to output data, and can then be used in later steps. Meta jobs can form their own DAG, even if there's no input data yet.

Running a metajob creates normal DVC jobs for each valid combination of input chunks present. A chunk is present if either

Metajobs can also depend on normal DVC jobs/files, in which case the dependency is passed to all generated jobs. All metajobs are run before normal jobs, to generate a set of normal jobs for all available input data. The normal jobs can then be run in the same way as they currently are.

Step to step mapping - inputs

The step-to-step mapping would need to be configurable. This would need to be done such that subsequent steps can use the outputs of previous ones. You'd need to decide "do I always map a single step output to a single step input?" I.e. if my input source (call it "stepA") has data like this:

20200718_stepA_0.csv
20200718_stepA_1.csv
20200719_stepA_0.csv
20200720_stepA_0.csv
20200720_stepA_1.csv
20200720_stepA_2.csv

And "stepB" depends on "stepA", should "stepB" be called three times (for the three days, with all inputs for a given day passes as inputs to stepB), six times (for each file, passed individually) or once (easier implemented by passing the whole folder, as in the current version of DVC).

You might achieve this by e.g. defining a regex for inputs. For stepB you might store the regex ^(\d{8})_stepA_\d.csv$ in your step config. This matches all the desired inputs.

For case 2 (stepB should be called six times), this would be enough. For case 1 (stepB should be called three times, with grouped inputs) you might also specify

groupBy:
  - 1

where this tells DVC to group by the contents of capture group 1 in the regex (i.e. the timestamp).

Step to step mapping - outputs

The outputs would also need defining. "stepB" might only have a single output, with the form $1_stepB_0.csv. This is a regex replacement string, and should resolve to a specific, single filename for any of the inputs which match the input regex.

Input passing

Since your steps are being called with different inputs each time, you'll also need some way of passing these inputs to the command that's being called. That might involve a substitution token in the command, like python process_files.py %INPUT_1% or, for a variable-length list of input files, python process_files.py %INPUTS_1% with an additional field input_1_format: "-f %INPUT% ", resulting in a call like python process_files.py -f file1.csv -f file2.csv -f file3.csv.

That's a bit ugly right now, I'm sure some thought could make it more elegant.

Summary

That's a bit of a brain-dump and I'm sorry if it's not totally coherent. I thought I'd document my experience with adding this kind of functionality on in case it informs the requirements for adding this into DVC itself.

I'd summarise my suggestion by: